Demuxer/Player: 修改解复用与播放器底层逻辑,确保触发播放成功回调时不丢帧

This commit is contained in:
ziyue
2021-11-10 10:58:43 +08:00
parent 37fdb8d135
commit fdfde17ec7
14 changed files with 391 additions and 421 deletions

View File

@@ -20,9 +20,6 @@ MediaPlayer::MediaPlayer(const EventPoller::Ptr &poller) {
_poller = poller ? poller : EventPollerPool::Instance().getPoller();
}
MediaPlayer::~MediaPlayer() {
}
static void setOnCreateSocket_l(const std::shared_ptr<PlayerBase> &delegate, const Socket::onCreateSocket &cb){
auto helper = dynamic_pointer_cast<SocketHelper>(delegate);
if (helper) {
@@ -41,10 +38,10 @@ void MediaPlayer::play(const string &url) {
_delegate = PlayerBase::createPlayer(_poller, url);
assert(_delegate);
setOnCreateSocket_l(_delegate, _on_create_socket);
_delegate->setOnShutdown(_shutdownCB);
_delegate->setOnPlayResult(_playResultCB);
_delegate->setOnResume(_resumeCB);
_delegate->setMediaSource(_pMediaSrc);
_delegate->setOnShutdown(_on_shutdown);
_delegate->setOnPlayResult(_on_play_result);
_delegate->setOnResume(_on_resume);
_delegate->setMediaSource(_media_src);
_delegate->mINI::operator=(*this);
_delegate->play(url);
}
@@ -58,23 +55,4 @@ void MediaPlayer::setOnCreateSocket(Socket::onCreateSocket cb){
_on_create_socket = std::move(cb);
}
void MediaPlayer::pause(bool pause) {
if (_delegate) {
_delegate->pause(pause);
}
}
void MediaPlayer::speed(float speed) {
if (_delegate) {
_delegate->speed(speed);
}
}
void MediaPlayer::teardown() {
if (_delegate) {
_delegate->teardown();
}
}
} /* namespace mediakit */

View File

@@ -21,16 +21,14 @@ using namespace toolkit;
namespace mediakit {
class MediaPlayer : public PlayerImp<PlayerBase,PlayerBase> {
class MediaPlayer : public PlayerImp<PlayerBase, PlayerBase> {
public:
typedef std::shared_ptr<MediaPlayer> Ptr;
using Ptr = std::shared_ptr<MediaPlayer>;
MediaPlayer(const EventPoller::Ptr &poller = nullptr);
virtual ~MediaPlayer();
~MediaPlayer() override = default;
void play(const string &url) override;
void pause(bool pause) override;
void speed(float speed) override;
void teardown() override;
EventPoller::Ptr getPoller();
void setOnCreateSocket(Socket::onCreateSocket cb);

View File

@@ -13,14 +13,15 @@
#include "Rtsp/RtspPlayerImp.h"
#include "Rtmp/RtmpPlayerImp.h"
#include "Http/HlsPlayer.h"
using namespace toolkit;
namespace mediakit {
PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller,const string &url_in) {
static auto releasePlayer = [](PlayerBase *ptr){
onceToken token(nullptr,[&](){
delete ptr;
PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller, const string &url_in) {
static auto releasePlayer = [](PlayerBase *ptr) {
onceToken token(nullptr, [&]() {
delete ptr;
});
ptr->teardown();
};
@@ -32,98 +33,91 @@ PlayerBase::Ptr PlayerBase::createPlayer(const EventPoller::Ptr &poller,const st
url = url.substr(0, pos);
}
if (strcasecmp("rtsps",prefix.data()) == 0) {
return PlayerBase::Ptr(new TcpClientWithSSL<RtspPlayerImp>(poller),releasePlayer);
if (strcasecmp("rtsps", prefix.data()) == 0) {
return PlayerBase::Ptr(new TcpClientWithSSL<RtspPlayerImp>(poller), releasePlayer);
}
if (strcasecmp("rtsp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer);
if (strcasecmp("rtsp", prefix.data()) == 0) {
return PlayerBase::Ptr(new RtspPlayerImp(poller), releasePlayer);
}
if (strcasecmp("rtmps",prefix.data()) == 0) {
return PlayerBase::Ptr(new TcpClientWithSSL<RtmpPlayerImp>(poller),releasePlayer);
if (strcasecmp("rtmps", prefix.data()) == 0) {
return PlayerBase::Ptr(new TcpClientWithSSL<RtmpPlayerImp>(poller), releasePlayer);
}
if (strcasecmp("rtmp",prefix.data()) == 0) {
return PlayerBase::Ptr(new RtmpPlayerImp(poller),releasePlayer);
if (strcasecmp("rtmp", prefix.data()) == 0) {
return PlayerBase::Ptr(new RtmpPlayerImp(poller), releasePlayer);
}
if ((strcasecmp("http",prefix.data()) == 0 || strcasecmp("https",prefix.data()) == 0) && end_with(url, ".m3u8")) {
return PlayerBase::Ptr(new HlsPlayerImp(poller),releasePlayer);
if ((strcasecmp("http", prefix.data()) == 0 || strcasecmp("https", prefix.data()) == 0) && end_with(url, ".m3u8")) {
return PlayerBase::Ptr(new HlsPlayerImp(poller), releasePlayer);
}
return PlayerBase::Ptr(new RtspPlayerImp(poller),releasePlayer);
return PlayerBase::Ptr(new RtspPlayerImp(poller), releasePlayer);
}
PlayerBase::PlayerBase() {
this->mINI::operator[](kTimeoutMS) = 10000;
this->mINI::operator[](kMediaTimeoutMS) = 5000;
this->mINI::operator[](kBeatIntervalMS) = 5000;
this->mINI::operator[](kMaxAnalysisMS) = 5000;
}
///////////////////////////Demuxer//////////////////////////////
bool Demuxer::isInited(int analysisMs) {
if(analysisMs && _ticker.createdTime() > (uint64_t)analysisMs){
//analysisMs毫秒后强制初始化完毕
return true;
}
if (_videoTrack && !_videoTrack->ready()) {
//视频未准备好
return false;
}
if (_audioTrack && !_audioTrack->ready()) {
//音频未准备好
return false;
}
return true;
}
vector<Track::Ptr> Demuxer::getTracks(bool trackReady) const {
vector<Track::Ptr> ret;
if(_videoTrack){
if(trackReady){
if(_videoTrack->ready()){
ret.emplace_back(_videoTrack);
}
}else{
ret.emplace_back(_videoTrack);
}
}
if(_audioTrack){
if(trackReady){
if(_audioTrack->ready()){
ret.emplace_back(_audioTrack);
}
}else{
ret.emplace_back(_audioTrack);
}
bool Demuxer::addTrack(const Track::Ptr &track) {
auto ret = MediaSink::addTrack(track);
if (ret) {
track->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
return inputFrame(frame);
}));
}
return ret;
}
float Demuxer::getDuration() const {
return _fDuration;
}
bool Demuxer::addTrack(const Track::Ptr &track){
return _listener ? _listener->addTrack(track) : false;
}
void Demuxer::addTrackCompleted(){
if(_listener){
_listener->addTrackCompleted();
}
}
void Demuxer::resetTracks() {
if (_listener) {
_listener->resetTracks();
}
}
void Demuxer::setTrackListener(TrackListener *listener) {
_listener = listener;
}
bool Demuxer::onTrackReady(const Track::Ptr &track) {
_tracks[track->getTrackType()] = track->clone();
return true;
}
void Demuxer::onAllTrackReady() {
if (!_listener) {
return;
}
for (auto &track : _tracks) {
if (track) {
_listener->addTrack(track);
}
}
_listener->addTrackCompleted();
}
bool Demuxer::onTrackFrame(const Frame::Ptr &frame) {
return _tracks[frame->getTrackType()]->inputFrame(frame);
}
void Demuxer::resetTracks() {
MediaSink::resetTracks();
for (auto &track : _tracks) {
track = nullptr;
}
if (_listener) {
_listener->resetTracks();
}
}
vector<Track::Ptr> Demuxer::getTracks(bool ready) const {
vector<Track::Ptr> ret;
for (auto &track : _tracks) {
if (!track || (ready && !track->ready())) {
continue;
}
ret.emplace_back(track);
}
return ret;
}
} /* namespace mediakit */

View File

@@ -22,292 +22,242 @@
#include "Common/MediaSink.h"
#include "Extension/Frame.h"
#include "Extension/Track.h"
using namespace toolkit;
namespace mediakit {
class DemuxerBase : public TrackSource{
class PlayerBase : public TrackSource, public mINI {
public:
typedef std::shared_ptr<DemuxerBase> Ptr;
using Ptr = std::shared_ptr<PlayerBase>;
using Event = std::function<void(const SockException &ex)>;
/**
* 获取节目总时长,单位秒
* @return
*/
virtual float getDuration() const { return 0;}
/**
* 是否初始化完毕完毕后方可调用getTrack方法
* @param analysisMs 数据流最大分析时间 单位毫秒
* @return
*/
virtual bool isInited(int analysisMs) { return true; }
};
class PlayerBase : public DemuxerBase, public mINI{
public:
typedef std::shared_ptr<PlayerBase> Ptr;
static Ptr createPlayer(const EventPoller::Ptr &poller,const string &strUrl);
static Ptr createPlayer(const EventPoller::Ptr &poller, const string &strUrl);
PlayerBase();
virtual ~PlayerBase(){}
~PlayerBase() override = default;
/**
* 开始播放
* @param strUrl 视频url支持rtsp/rtmp
* @param url 视频url支持rtsp/rtmp
*/
virtual void play(const string &strUrl) {}
virtual void play(const string &url) {};
/**
* 暂停或恢复
* @param bPause
* @param flag true:暂停false:恢复
*/
virtual void pause(bool bPause) {}
virtual void pause(bool flag) {};
/**
* 获取节目总时长,单位秒
*/
virtual float getDuration() const { return 0; };
/**
* 倍数播放
* @param speed 1.0 2.0 0.5
*/
virtual void speed(float speed) {}
virtual void speed(float speed) {};
/**
* 中断播放
*/
virtual void teardown() {}
/**
* 设置异常中断回调
* @param cb
*/
virtual void setOnShutdown( const function<void(const SockException &)> &cb) {}
/**
* 设置播放结果回调
* @param cb
*/
virtual void setOnPlayResult( const function<void(const SockException &ex)> &cb) {}
/**
* 设置播放恢复回调
* @param cb
*/
virtual void setOnResume( const function<void()> &cb) {}
virtual void teardown() {};
/**
* 获取播放进度,取值 0.0 ~ 1.0
* @return
*/
virtual float getProgress() const { return 0;}
virtual float getProgress() const { return 0; };
/**
* 获取播放进度pos取值 相对开始时间增量 单位秒
* @return
*/
virtual uint32_t getProgressPos() const { return 0; }
virtual uint32_t getProgressPos() const { return 0; };
/**
* 拖动进度条
* @param fProgress 进度,取值 0.0 ~ 1.0
* @param progress 进度,取值 0.0 ~ 1.0
*/
virtual void seekTo(float fProgress) {}
virtual void seekTo(float progress) {};
/**
* 拖动进度条
* @param seekPos 进度,取值 相对于开始时间的增量 单位秒
* @param pos 进度,取值 相对于开始时间的增量 单位秒
*/
virtual void seekTo(uint32_t seekPos) {}
/**
* 设置一个MediaSource直接生产rtsp/rtmp代理
* @param src
*/
virtual void setMediaSource(const MediaSource::Ptr & src) {}
virtual void seekTo(uint32_t pos) {};
/**
* 获取丢包率只支持rtsp
* @param trackType 音频或视频TrackInvalid时为总丢包率
* @return
* @param type 音频或视频TrackInvalid时为总丢包率
*/
virtual float getPacketLossRate(TrackType trackType) const {return 0; }
virtual float getPacketLossRate(TrackType type) const { return 0; };
/**
* 获取所有track
*/
vector<Track::Ptr> getTracks(bool trackReady = true) const override{
return vector<Track::Ptr>();
}
protected:
virtual void onShutdown(const SockException &ex) {}
virtual void onPlayResult(const SockException &ex) {}
vector<Track::Ptr> getTracks(bool ready = true) const override { return vector<Track::Ptr>(); };
/**
* 暂停后恢复播放时间
* 设置一个MediaSource直接生产rtsp/rtmp代理
*/
virtual void onResume(){};
virtual void setMediaSource(const MediaSource::Ptr &src) = 0;
/**
* 设置异常中断回调
*/
virtual void setOnShutdown(const Event &cb) = 0;
/**
* 设置播放结果回调
*/
virtual void setOnPlayResult(const Event &cb) = 0;
/**
* 设置播放恢复回调
*/
virtual void setOnResume(const function<void()> &cb) = 0;
protected:
virtual void onResume() = 0;
virtual void onShutdown(const SockException &ex) = 0;
virtual void onPlayResult(const SockException &ex) = 0;
};
template<typename Parent,typename Delegate>
template<typename Parent, typename Delegate>
class PlayerImp : public Parent {
public:
typedef std::shared_ptr<PlayerImp> Ptr;
using Ptr = std::shared_ptr<PlayerImp>;
template<typename ...ArgsType>
PlayerImp(ArgsType &&...args):Parent(std::forward<ArgsType>(args)...) {}
virtual ~PlayerImp() {}
PlayerImp(ArgsType &&...args) : Parent(std::forward<ArgsType>(args)...) {}
~PlayerImp() override = default;
void setOnShutdown(const function<void(const SockException &)> &cb) override {
if (_delegate) {
_delegate->setOnShutdown(cb);
}
_shutdownCB = cb;
void play(const string &url) override {
return _delegate ? _delegate->play(url) : Parent::play(url);
}
void setOnPlayResult(const function<void(const SockException &ex)> &cb) override {
if (_delegate) {
_delegate->setOnPlayResult(cb);
}
_playResultCB = cb;
void pause(bool flag) override {
return _delegate ? _delegate->pause(flag) : Parent::pause(flag);
}
void setOnResume(const function<void()> &cb) override {
if (_delegate) {
_delegate->setOnResume(cb);
}
_resumeCB = cb;
void speed(float speed) override {
return _delegate ? _delegate->speed(speed) : Parent::speed(speed);
}
bool isInited(int analysisMs) override {
if (_delegate) {
return _delegate->isInited(analysisMs);
}
return Parent::isInited(analysisMs);
void teardown() override {
return _delegate ? _delegate->teardown() : Parent::teardown();
}
float getPacketLossRate(TrackType type) const override {
return _delegate ? _delegate->getPacketLossRate(type) : Parent::getPacketLossRate(type);
}
float getDuration() const override {
if (_delegate) {
return _delegate->getDuration();
}
return Parent::getDuration();
return _delegate ? _delegate->getDuration() : Parent::getDuration();
}
float getProgress() const override {
if (_delegate) {
return _delegate->getProgress();
}
return Parent::getProgress();
return _delegate ? _delegate->getProgress() : Parent::getProgress();
}
uint32_t getProgressPos() const override {
if (_delegate) {
return _delegate->getProgressPos();
}
return Parent::getProgressPos();
return _delegate ? _delegate->getProgressPos() : Parent::getProgressPos();
}
void seekTo(float fProgress) override {
if (_delegate) {
return _delegate->seekTo(fProgress);
}
return Parent::seekTo(fProgress);
void seekTo(float progress) override {
return _delegate ? _delegate->seekTo(progress) : Parent::seekTo(progress);
}
void seekTo(uint32_t seekPos) override {
if (_delegate) {
return _delegate->seekTo(seekPos);
}
return Parent::seekTo(seekPos);
void seekTo(uint32_t pos) override {
return _delegate ? _delegate->seekTo(pos) : Parent::seekTo(pos);
}
void setMediaSource(const MediaSource::Ptr &src) override {
if (_delegate) {
_delegate->setMediaSource(src);
}
_pMediaSrc = src;
}
vector<Track::Ptr> getTracks(bool trackReady = true) const override {
if (_delegate) {
return _delegate->getTracks(trackReady);
}
return Parent::getTracks(trackReady);
vector<Track::Ptr> getTracks(bool ready = true) const override {
return _delegate ? _delegate->getTracks(ready) : Parent::getTracks(ready);
}
std::shared_ptr<SockInfo> getSockInfo() const {
return dynamic_pointer_cast<SockInfo>(_delegate);
}
void setMediaSource(const MediaSource::Ptr &src) override {
if (_delegate) {
_delegate->setMediaSource(src);
}
_media_src = src;
}
void setOnShutdown(const function<void(const SockException &)> &cb) override {
if (_delegate) {
_delegate->setOnShutdown(cb);
}
_on_shutdown = cb;
}
void setOnPlayResult(const function<void(const SockException &ex)> &cb) override {
if (_delegate) {
_delegate->setOnPlayResult(cb);
}
_on_play_result = cb;
}
void setOnResume(const function<void()> &cb) override {
if (_delegate) {
_delegate->setOnResume(cb);
}
_on_resume = cb;
}
protected:
void onShutdown(const SockException &ex) override {
if (_shutdownCB) {
_shutdownCB(ex);
_shutdownCB = nullptr;
if (_on_shutdown) {
_on_shutdown(ex);
_on_shutdown = nullptr;
}
}
void onPlayResult(const SockException &ex) override {
if(_playResultCB) {
_playResultCB(ex);
_playResultCB = nullptr;
if (_on_play_result) {
_on_play_result(ex);
_on_play_result = nullptr;
}
}
void onResume() override{
if(_resumeCB){
_resumeCB();
void onResume() override {
if (_on_resume) {
_on_resume();
}
}
protected:
function<void(const SockException &ex)> _shutdownCB;
function<void(const SockException &ex)> _playResultCB;
function<void()> _resumeCB;
function<void()> _on_resume;
PlayerBase::Event _on_shutdown;
PlayerBase::Event _on_play_result;
MediaSource::Ptr _media_src;
std::shared_ptr<Delegate> _delegate;
MediaSource::Ptr _pMediaSrc;
};
class Demuxer : public PlayerBase, public TrackListener{
class Demuxer : protected MediaSink {
public:
Demuxer() = default;
~Demuxer() override = default;
/**
* 返回是否完成初始化完毕
* 在构造RtspDemuxer对象时有些rtsp的sdp不包含sps pps信息
* 所以要等待接收到到sps的rtp包后才能完成
*
* 在构造RtmpDemuxer对象时是无法获取sps pps aac_cfg等这些信息
* 所以要调用inputRtmp后才会获取到这些信息这时才初始化成功
* @param analysisMs 数据流最大分析时间 单位毫秒
* @return
*/
bool isInited(int analysisMs) override;
/**
* 获取所有Track
* @return 所有Track
*/
vector<Track::Ptr> getTracks(bool trackReady = true) const override;
/**
* 获取节目总时长
* @return 节目总时长,单位秒
*/
float getDuration() const override;
/**
* 设置track监听器
*/
void setTrackListener(TrackListener *listener);
vector<Track::Ptr> getTracks(bool ready = true) const override;
protected:
bool addTrack(const Track::Ptr &track) override;
void addTrackCompleted() override;
bool addTrack(const Track::Ptr & track) override;
void resetTracks() override;
bool onTrackReady(const Track::Ptr & track) override;
void onAllTrackReady() override;
bool onTrackFrame(const Frame::Ptr &frame) override;
protected:
float _fDuration = 0;
Ticker _ticker;
AudioTrack::Ptr _audioTrack;
VideoTrack::Ptr _videoTrack;
private:
Track::Ptr _tracks[TrackMax];
TrackListener *_listener = nullptr;
};

View File

@@ -160,7 +160,7 @@ bool PlayerProxy::close(MediaSource &sender, bool force) {
}
int PlayerProxy::totalReaderCount() {
return (_muxer ? _muxer->totalReaderCount() : 0) + (_pMediaSrc ? _pMediaSrc->readerCount() : 0);
return (_muxer ? _muxer->totalReaderCount() : 0) + (_media_src ? _media_src->readerCount() : 0);
}
int PlayerProxy::totalReaderCount(MediaSource &sender) {
@@ -181,12 +181,12 @@ std::shared_ptr<SockInfo> PlayerProxy::getOriginSock(MediaSource &sender) const
void PlayerProxy::onPlaySuccess() {
GET_CONFIG(bool, resetWhenRePlay, General::kResetWhenRePlay);
if (dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc)) {
if (dynamic_pointer_cast<RtspMediaSource>(_media_src)) {
//rtsp拉流代理
if (resetWhenRePlay || !_muxer) {
_muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), false, true, _enable_hls, _enable_mp4));
}
} else if (dynamic_pointer_cast<RtmpMediaSource>(_pMediaSrc)) {
} else if (dynamic_pointer_cast<RtmpMediaSource>(_media_src)) {
//rtmp拉流代理
if (resetWhenRePlay || !_muxer) {
_muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), true, false, _enable_hls, _enable_mp4));
@@ -218,9 +218,9 @@ void PlayerProxy::onPlaySuccess() {
//添加完毕所有track防止单track情况下最大等待3秒
_muxer->addTrackCompleted();
if (_pMediaSrc) {
if (_media_src) {
//让_muxer对象拦截一部分事件(比如说录像相关事件)
_pMediaSrc->setListener(_muxer);
_media_src->setListener(_muxer);
}
}