解决播放成功与中途断开事件触发紊乱的问题:#143

This commit is contained in:
xiongziliang
2019-11-19 15:52:02 +08:00
parent 85cd4a7d02
commit e6d511cc9e
5 changed files with 140 additions and 127 deletions

View File

@@ -34,44 +34,35 @@ using namespace mediakit::Client;
namespace mediakit {
unordered_map<string, RtmpPlayer::rtmpCMDHandle> RtmpPlayer::g_mapCmd;
RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {
static onceToken token([]() {
g_mapCmd.emplace("_error",&RtmpPlayer::onCmd_result);
g_mapCmd.emplace("_result",&RtmpPlayer::onCmd_result);
g_mapCmd.emplace("onStatus",&RtmpPlayer::onCmd_onStatus);
g_mapCmd.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData);
}, []() {});
}
RtmpPlayer::~RtmpPlayer() {
DebugL << endl;
}
void RtmpPlayer::teardown() {
if (alive()) {
_strApp.clear();
_strStream.clear();
_strTcUrl.clear();
{
lock_guard<recursive_mutex> lck(_mtxOnResultCB);
_mapOnResultCB.clear();
}
{
lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
_dqOnStatusCB.clear();
}
_pBeatTimer.reset();
_pPlayTimer.reset();
_pMediaTimer.reset();
_iSeekTo = 0;
CLEAR_ARR(_aiFistStamp);
CLEAR_ARR(_aiNowStamp);
reset();
shutdown(SockException(Err_shutdown,"teardown"));
shutdown(SockException(Err_shutdown,"teardown"));
}
_strApp.clear();
_strStream.clear();
_strTcUrl.clear();
_pBeatTimer.reset();
_pPlayTimer.reset();
_pMediaTimer.reset();
_iSeekTo = 0;
RtmpProtocol::reset();
CLEAR_ARR(_aiFistStamp);
CLEAR_ARR(_aiNowStamp);
lock_guard<recursive_mutex> lck(_mtxOnResultCB);
_mapOnResultCB.clear();
lock_guard<recursive_mutex> lck2(_mtxOnStatusCB);
_dqOnStatusCB.clear();
}
void RtmpPlayer::play(const string &strUrl) {
teardown();
string strHost = FindField(strUrl.data(), "://", "/");
@@ -80,7 +71,7 @@ void RtmpPlayer::play(const string &strUrl) {
_strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
if (!_strApp.size() || !_strStream.size()) {
onPlayResult_l(SockException(Err_other,"rtmp url非法"));
onPlayResult_l(SockException(Err_other,"rtmp url非法"),false);
return;
}
DebugL << strHost << " " << _strApp << " " << _strStream;
@@ -104,7 +95,7 @@ void RtmpPlayer::play(const string &strUrl) {
if(!strongSelf) {
return false;
}
strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout"));
strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout"),false);
return false;
},getPoller()));
@@ -112,53 +103,52 @@ void RtmpPlayer::play(const string &strUrl) {
startConnect(strHost, iPort , playTimeOutSec);
}
void RtmpPlayer::onErr(const SockException &ex){
onPlayResult_l(ex);
//定时器_pPlayTimer为空后表明握手结束了
onPlayResult_l(ex, !_pPlayTimer);
}
void RtmpPlayer::onPlayResult_l(const SockException &ex) {
void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) {
WarnL << ex.getErrCode() << " " << ex.what();
if(!ex){
//恢复rtmp接收超时定时器
//播放成功,恢复rtmp接收超时定时器
_mediaTicker.resetTime();
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
int timeoutMS = (*this)[kMediaTimeoutMS].as<int>();
//创建rtmp数据接收超时检测定时器
_pMediaTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
if(strongSelf->_mediaTicker.elapsedTime()> timeoutMS) {
//recv media timeout!
strongSelf->onPlayResult_l(SockException(Err_timeout,"recv rtmp timeout"));
//接收rtmp媒体数据超时
strongSelf->onPlayResult_l(SockException(Err_timeout,"receive rtmp timeout"),true);
return false;
}
return true;
},getPoller()));
}
if (_pPlayTimer) {
if (!handshakeCompleted) {
//开始播放阶段
_pPlayTimer.reset();
onPlayResult(ex);
}else {
//播放中途阶段
if (ex) {
//播放成功后异常断开回调
onShutdown(ex);
}else{
//恢复播放
onResume();
}
}
} else if (ex) {
//播放成功后异常断开回调
onShutdown(ex);
} else {
//恢复播放
onResume();
}
if(ex){
teardown();
}
}
void RtmpPlayer::onConnect(const SockException &err){
if(err.getErrCode()!=Err_success) {
onPlayResult_l(err);
if(err.getErrCode() != Err_success) {
onPlayResult_l(err, false);
return;
}
weak_ptr<RtmpPlayer> weakSelf= dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
@@ -175,7 +165,8 @@ void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){
onParseRtmp(pBuf->data(), pBuf->size());
} catch (exception &e) {
SockException ex(Err_other, e.what());
onPlayResult_l(ex);
//定时器_pPlayTimer为空后表明握手结束了
onPlayResult_l(ex, !_pPlayTimer);
}
}
@@ -253,7 +244,7 @@ inline void RtmpPlayer::send_pause(bool bPause) {
}else{
_bPaused = bPause;
if(!bPause){
onPlayResult_l(SockException(Err_success, "rtmp resum success"));
onPlayResult_l(SockException(Err_success, "resum rtmp success"), true);
}else{
//暂停播放
_pMediaTimer.reset();
@@ -327,7 +318,7 @@ void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) {
void RtmpPlayer::onStreamDry(uint32_t ui32StreamId) {
//TraceL << ui32StreamId;
onPlayResult_l(SockException(Err_other,"rtmp stream dry"));
onPlayResult_l(SockException(Err_other,"rtmp stream dry"), true);
}
void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) {
@@ -343,7 +334,7 @@ void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) {
onMediaData(packet);
}else{
//先触发onPlayResult事件这个时候解码器才能初始化完毕
onPlayResult_l(SockException(Err_success,"play rtmp success"));
onPlayResult_l(SockException(Err_success,"play rtmp success"), false);
//触发onPlayResult事件后再把帧数据输入到解码器
onMediaData(packet);
}
@@ -351,6 +342,15 @@ void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) {
void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) {
typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec);
static unordered_map<string, rtmp_func_ptr> s_func_map;
static onceToken token([]() {
s_func_map.emplace("_error",&RtmpPlayer::onCmd_result);
s_func_map.emplace("_result",&RtmpPlayer::onCmd_result);
s_func_map.emplace("onStatus",&RtmpPlayer::onCmd_onStatus);
s_func_map.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData);
}, []() {});
switch (chunkData.typeId) {
case MSG_CMD:
case MSG_CMD3:
@@ -358,8 +358,8 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) {
case MSG_DATA3: {
AMFDecoder dec(chunkData.strBuf, 0);
std::string type = dec.load<std::string>();
auto it = g_mapCmd.find(type);
if(it != g_mapCmd.end()){
auto it = s_func_map.find(type);
if(it != s_func_map.end()){
auto fun = it->second;
(this->*fun)(dec);
}else{