整理代码

This commit is contained in:
xiongziliang
2020-08-30 10:48:34 +08:00
parent 4255914613
commit fbd711a6bb
28 changed files with 1052 additions and 1011 deletions

View File

@@ -18,8 +18,7 @@ using namespace mediakit::Client;
namespace mediakit {
RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {
}
RtmpPlayer::RtmpPlayer(const EventPoller::Ptr &poller) : TcpClient(poller) {}
RtmpPlayer::~RtmpPlayer() {
DebugL << endl;
@@ -29,97 +28,82 @@ void RtmpPlayer::teardown() {
if (alive()) {
shutdown(SockException(Err_shutdown,"teardown"));
}
_strApp.clear();
_strStream.clear();
_strTcUrl.clear();
_pBeatTimer.reset();
_pPlayTimer.reset();
_pMediaTimer.reset();
_iSeekTo = 0;
_app.clear();
_stream_id.clear();
_tc_url.clear();
_beat_timer.reset();
_play_timer.reset();
_rtmp_recv_timer.reset();
_seek_ms = 0;
RtmpProtocol::reset();
CLEAR_ARR(_aiFistStamp);
CLEAR_ARR(_aiNowStamp);
CLEAR_ARR(_fist_stamp);
CLEAR_ARR(_now_stamp);
lock_guard<recursive_mutex> lck(_mtxOnResultCB);
_mapOnResultCB.clear();
lock_guard<recursive_mutex> lck2(_mtxOnStatusCB);
_dqOnStatusCB.clear();
lock_guard<recursive_mutex> lck(_mtx_on_result);
_map_on_result.clear();
lock_guard<recursive_mutex> lck2(_mtx_on_status);
_deque_on_status.clear();
}
void RtmpPlayer::play(const string &strUrl) {
teardown();
string strHost = FindField(strUrl.data(), "://", "/");
_strApp = FindField(strUrl.data(), (strHost + "/").data(), "/");
_strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
_strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
string host_url = FindField(strUrl.data(), "://", "/");
_app = FindField(strUrl.data(), (host_url + "/").data(), "/");
_stream_id = FindField(strUrl.data(), (host_url + "/" + _app + "/").data(), NULL);
_tc_url = string("rtmp://") + host_url + "/" + _app;
if (!_strApp.size() || !_strStream.size()) {
onPlayResult_l(SockException(Err_other,"rtmp url非法"),false);
if (!_app.size() || !_stream_id.size()) {
onPlayResult_l(SockException(Err_other, "rtmp url非法"), false);
return;
}
DebugL << strHost << " " << _strApp << " " << _strStream;
DebugL << host_url << " " << _app << " " << _stream_id;
auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
auto iPort = atoi(FindField(host_url.data(), ":", NULL).data());
if (iPort <= 0) {
//rtmp 默认端口1935
iPort = 1935;
} else {
//服务器域名
strHost = FindField(strHost.data(), NULL, ":");
host_url = FindField(host_url.data(), NULL, ":");
}
if(!(*this)[kNetAdapter].empty()){
if (!(*this)[kNetAdapter].empty()) {
setNetAdapter((*this)[kNetAdapter]);
}
weak_ptr<RtmpPlayer> weakSelf= dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
float playTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
_pPlayTimer.reset( new Timer(playTimeOutSec, [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
weak_ptr<RtmpPlayer> weak_self = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
float play_timeout_sec = (*this)[kTimeoutMS].as<int>() / 1000.0;
_play_timer.reset(new Timer(play_timeout_sec, [weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
strongSelf->onPlayResult_l(SockException(Err_timeout,"play rtmp timeout"),false);
strong_self->onPlayResult_l(SockException(Err_timeout, "play rtmp timeout"), false);
return false;
},getPoller()));
}, getPoller()));
_metadata_got = false;
startConnect(strHost, iPort , playTimeOutSec);
startConnect(host_url, iPort, play_timeout_sec);
}
void RtmpPlayer::onErr(const SockException &ex){
//定时器_pPlayTimer为空后表明握手结束了
onPlayResult_l(ex, !_pPlayTimer);
onPlayResult_l(ex, !_play_timer);
}
void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeCompleted) {
WarnL << ex.getErrCode() << " " << ex.what();
if(!ex){
//播放成功恢复rtmp接收超时定时器
_mediaTicker.resetTime();
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
int timeoutMS = (*this)[kMediaTimeoutMS].as<int>();
//创建rtmp数据接收超时检测定时器
_pMediaTimer.reset( new Timer(timeoutMS / 2000.0, [weakSelf,timeoutMS]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
if(strongSelf->_mediaTicker.elapsedTime()> timeoutMS) {
//接收rtmp媒体数据超时
strongSelf->onPlayResult_l(SockException(Err_timeout,"receive rtmp timeout"),true);
return false;
}
return true;
},getPoller()));
void RtmpPlayer::onPlayResult_l(const SockException &ex, bool handshakeCompleted) {
if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调
return;
}
WarnL << ex.getErrCode() << " " << ex.what();
if (!handshakeCompleted) {
//开始播放阶段
_pPlayTimer.reset();
onPlayResult(ex);
_play_timer.reset();
//是否为性能测试模式
_benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
onPlayResult(ex);
} else if (ex) {
//播放成功后异常断开回调
onShutdown(ex);
@@ -128,36 +112,58 @@ void RtmpPlayer::onPlayResult_l(const SockException &ex , bool handshakeComplete
onResume();
}
if(ex){
if (!ex) {
//播放成功恢复rtmp接收超时定时器
_rtmp_recv_ticker.resetTime();
int timeout_ms = (*this)[kMediaTimeoutMS].as<int>();
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
auto lam = [weakSelf, timeout_ms]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return false;
}
if (strongSelf->_rtmp_recv_ticker.elapsedTime() > timeout_ms) {
//接收rtmp媒体数据超时
SockException ex(Err_timeout, "receive rtmp timeout");
strongSelf->onPlayResult_l(ex, true);
return false;
}
return true;
};
//创建rtmp数据接收超时检测定时器
_rtmp_recv_timer = std::make_shared<Timer>(timeout_ms / 2000.0, lam, getPoller());
} else {
teardown();
}
}
void RtmpPlayer::onConnect(const SockException &err){
if(err.getErrCode() != Err_success) {
if (err.getErrCode() != Err_success) {
onPlayResult_l(err, false);
return;
}
weak_ptr<RtmpPlayer> weakSelf= dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
startClientSession([weakSelf](){
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
startClientSession([weakSelf]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return;
}
strongSelf->send_connect();
});
}
void RtmpPlayer::onRecv(const Buffer::Ptr &pBuf){
try {
if(_benchmark_mode && !_pPlayTimer){
if (_benchmark_mode && !_play_timer) {
//在性能测试模式下如果rtmp握手完毕后不再解析rtmp包
_mediaTicker.resetTime();
_rtmp_recv_ticker.resetTime();
return;
}
onParseRtmp(pBuf->data(), pBuf->size());
} catch (exception &e) {
SockException ex(Err_other, e.what());
//定时器_pPlayTimer为空后表明握手结束了
onPlayResult_l(ex, !_pPlayTimer);
onPlayResult_l(ex, !_play_timer);
}
}
@@ -167,8 +173,8 @@ void RtmpPlayer::pause(bool bPause) {
inline void RtmpPlayer::send_connect() {
AMFValue obj(AMF_OBJECT);
obj.set("app", _strApp);
obj.set("tcUrl", _strTcUrl);
obj.set("app", _app);
obj.set("tcUrl", _tc_url);
//未使用代理
obj.set("fpad", false);
//参考librtmp,什么作用?
@@ -176,18 +182,18 @@ inline void RtmpPlayer::send_connect() {
//SUPPORT_VID_CLIENT_SEEK 支持seek
obj.set("videoFunction", 1);
//只支持aac
obj.set("audioCodecs", (double)(0x0400));
obj.set("audioCodecs", (double) (0x0400));
//只支持H264
obj.set("videoCodecs", (double)(0x0080));
obj.set("videoCodecs", (double) (0x0080));
sendInvoke("connect", obj);
addOnResultCB([this](AMFDecoder &dec){
addOnResultCB([this](AMFDecoder &dec) {
//TraceL << "connect result";
dec.load<AMFValue>();
auto val = dec.load<AMFValue>();
auto level = val["level"].as_string();
auto code = val["code"].as_string();
if(level != "status"){
throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl);
if (level != "status") {
throw std::runtime_error(StrPrinter << "connect 失败:" << level << " " << code << endl);
}
send_createStream();
});
@@ -196,24 +202,24 @@ inline void RtmpPlayer::send_connect() {
inline void RtmpPlayer::send_createStream() {
AMFValue obj(AMF_NULL);
sendInvoke("createStream", obj);
addOnResultCB([this](AMFDecoder &dec){
addOnResultCB([this](AMFDecoder &dec) {
//TraceL << "createStream result";
dec.load<AMFValue>();
_ui32StreamId = dec.load<int>();
_stream_index = dec.load<int>();
send_play();
});
}
inline void RtmpPlayer::send_play() {
AMFEncoder enc;
enc << "play" << ++_iReqID << nullptr << _strStream << (double)_ui32StreamId;
enc << "play" << ++_send_req_id << nullptr << _stream_id << (double) _stream_index;
sendRequest(MSG_CMD, enc.data());
auto fun = [this](AMFValue &val){
auto fun = [this](AMFValue &val) {
//TraceL << "play onStatus";
auto level = val["level"].as_string();
auto code = val["code"].as_string();
if(level != "status"){
throw std::runtime_error(StrPrinter <<"play 失败:" << level << " " << code << endl);
if (level != "status") {
throw std::runtime_error(StrPrinter << "play 失败:" << level << " " << code << endl);
}
};
addOnStatusCB(fun);
@@ -222,76 +228,77 @@ inline void RtmpPlayer::send_play() {
inline void RtmpPlayer::send_pause(bool bPause) {
AMFEncoder enc;
enc << "pause" << ++_iReqID << nullptr << bPause;
enc << "pause" << ++_send_req_id << nullptr << bPause;
sendRequest(MSG_CMD, enc.data());
auto fun = [this,bPause](AMFValue &val){
auto fun = [this, bPause](AMFValue &val) {
//TraceL << "pause onStatus";
auto level = val["level"].as_string();
auto code = val["code"].as_string();
if(level != "status") {
if(!bPause){
throw std::runtime_error(StrPrinter <<"pause 恢复播放失败:" << level << " " << code << endl);
if (level != "status") {
if (!bPause) {
throw std::runtime_error(StrPrinter << "pause 恢复播放失败:" << level << " " << code << endl);
}
}else{
_bPaused = bPause;
if(!bPause){
} else {
_paused = bPause;
if (!bPause) {
onPlayResult_l(SockException(Err_success, "resum rtmp success"), true);
}else{
} else {
//暂停播放
_pMediaTimer.reset();
_rtmp_recv_timer.reset();
}
}
};
addOnStatusCB(fun);
_pBeatTimer.reset();
if(bPause){
_beat_timer.reset();
if (bPause) {
weak_ptr<RtmpPlayer> weakSelf = dynamic_pointer_cast<RtmpPlayer>(shared_from_this());
_pBeatTimer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0,[weakSelf](){
_beat_timer.reset(new Timer((*this)[kBeatIntervalMS].as<int>() / 1000.0, [weakSelf]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf){
if (!strongSelf) {
return false;
}
uint32_t timeStamp = ::time(NULL);
strongSelf->sendUserControl(CONTROL_PING_REQUEST, timeStamp);
return true;
},getPoller()));
}, getPoller()));
}
}
void RtmpPlayer::onCmd_result(AMFDecoder &dec){
auto iReqId = dec.load<int>();
lock_guard<recursive_mutex> lck(_mtxOnResultCB);
auto it = _mapOnResultCB.find(iReqId);
if(it != _mapOnResultCB.end()){
auto req_id = dec.load<int>();
lock_guard<recursive_mutex> lck(_mtx_on_result);
auto it = _map_on_result.find(req_id);
if (it != _map_on_result.end()) {
it->second(dec);
_mapOnResultCB.erase(it);
}else{
_map_on_result.erase(it);
} else {
WarnL << "unhandled _result";
}
}
void RtmpPlayer::onCmd_onStatus(AMFDecoder &dec) {
AMFValue val;
while(true){
while (true) {
val = dec.load<AMFValue>();
if(val.type() == AMF_OBJECT){
if (val.type() == AMF_OBJECT) {
break;
}
}
if(val.type() != AMF_OBJECT){
if (val.type() != AMF_OBJECT) {
throw std::runtime_error("onStatus:the result object was not found");
}
lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
if(_dqOnStatusCB.size()){
_dqOnStatusCB.front()(val);
_dqOnStatusCB.pop_front();
}else{
lock_guard<recursive_mutex> lck(_mtx_on_status);
if (_deque_on_status.size()) {
_deque_on_status.front()(val);
_deque_on_status.pop_front();
} else {
auto level = val["level"];
auto code = val["code"].as_string();
if(level.type() == AMF_STRING){
if(level.as_string() != "status"){
throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl);
if (level.type() == AMF_STRING) {
if (level.as_string() != "status") {
throw std::runtime_error(StrPrinter << "onStatus 失败:" << level.as_string() << " " << code << endl);
}
}
//WarnL << "unhandled onStatus:" << code;
@@ -301,31 +308,31 @@ void RtmpPlayer::onCmd_onStatus(AMFDecoder &dec) {
void RtmpPlayer::onCmd_onMetaData(AMFDecoder &dec) {
//TraceL;
auto val = dec.load<AMFValue>();
if(!onCheckMeta(val)){
if (!onCheckMeta(val)) {
throw std::runtime_error("onCheckMeta failed");
}
_metadata_got = true;
}
void RtmpPlayer::onStreamDry(uint32_t ui32StreamId) {
//TraceL << ui32StreamId;
onPlayResult_l(SockException(Err_other,"rtmp stream dry"), true);
void RtmpPlayer::onStreamDry(uint32_t stream_id) {
//TraceL << stream_id;
onPlayResult_l(SockException(Err_other, "rtmp stream dry"), true);
}
void RtmpPlayer::onMediaData_l(const RtmpPacket::Ptr &packet) {
_mediaTicker.resetTime();
if(!_pPlayTimer){
_rtmp_recv_ticker.resetTime();
if (!_play_timer) {
//已经触发了onPlayResult事件直接触发onMediaData事件
onMediaData(packet);
return;
}
if(packet->isCfgFrame()){
if (packet->isCfgFrame()) {
//输入配置帧以便初始化完成各个track
onMediaData(packet);
}else{
} else {
//先触发onPlayResult事件这个时候解码器才能初始化完毕
onPlayResult_l(SockException(Err_success,"play rtmp success"), false);
onPlayResult_l(SockException(Err_success, "play rtmp success"), false);
//触发onPlayResult事件后再把帧数据输入到解码器
onMediaData(packet);
}
@@ -336,77 +343,76 @@ void RtmpPlayer::onRtmpChunk(RtmpPacket &chunkData) {
typedef void (RtmpPlayer::*rtmp_func_ptr)(AMFDecoder &dec);
static unordered_map<string, rtmp_func_ptr> s_func_map;
static onceToken token([]() {
s_func_map.emplace("_error",&RtmpPlayer::onCmd_result);
s_func_map.emplace("_result",&RtmpPlayer::onCmd_result);
s_func_map.emplace("onStatus",&RtmpPlayer::onCmd_onStatus);
s_func_map.emplace("onMetaData",&RtmpPlayer::onCmd_onMetaData);
}, []() {});
s_func_map.emplace("_error", &RtmpPlayer::onCmd_result);
s_func_map.emplace("_result", &RtmpPlayer::onCmd_result);
s_func_map.emplace("onStatus", &RtmpPlayer::onCmd_onStatus);
s_func_map.emplace("onMetaData", &RtmpPlayer::onCmd_onMetaData);
});
switch (chunkData.typeId) {
switch (chunkData.type_id) {
case MSG_CMD:
case MSG_CMD3:
case MSG_DATA:
case MSG_DATA3: {
AMFDecoder dec(chunkData.strBuf, 0);
AMFDecoder dec(chunkData.buffer, 0);
std::string type = dec.load<std::string>();
auto it = s_func_map.find(type);
if(it != s_func_map.end()){
if (it != s_func_map.end()) {
auto fun = it->second;
(this->*fun)(dec);
}else{
} else {
WarnL << "can not support cmd:" << type;
}
}
break;
}
case MSG_AUDIO:
case MSG_VIDEO: {
auto idx = chunkData.typeId%2;
if (_aNowStampTicker[idx].elapsedTime() > 500) {
auto idx = chunkData.type_id % 2;
if (_now_stamp_ticker[idx].elapsedTime() > 500) {
//计算播放进度时间轴用
_aiNowStamp[idx] = chunkData.timeStamp;
_now_stamp[idx] = chunkData.time_stamp;
}
if(!_metadata_got){
if(!onCheckMeta(TitleMeta().getMetadata())){
if (!_metadata_got) {
if (!onCheckMeta(TitleMeta().getMetadata())) {
throw std::runtime_error("onCheckMeta failed");
}
_metadata_got = true;
}
onMediaData_l(std::make_shared<RtmpPacket>(std::move(chunkData)));
}
break;
default:
//WarnL << "unhandled message:" << (int) chunkData.typeId << hexdump(chunkData.strBuf.data(), chunkData.strBuf.size());
break;
}
default: break;
}
}
uint32_t RtmpPlayer::getProgressMilliSecond() const{
uint32_t iTime[2] = {0,0};
for(auto i = 0 ;i < 2 ;i++){
iTime[i] = _aiNowStamp[i] - _aiFistStamp[i];
uint32_t stamp[2] = {0, 0};
for (auto i = 0; i < 2; i++) {
stamp[i] = _now_stamp[i] - _fist_stamp[i];
}
return _iSeekTo + MAX(iTime[0],iTime[1]);
return _seek_ms + MAX(stamp[0], stamp[1]);
}
void RtmpPlayer::seekToMilliSecond(uint32_t seekMS){
if (_bPaused) {
if (_paused) {
pause(false);
}
AMFEncoder enc;
enc << "seek" << ++_iReqID << nullptr << seekMS * 1.0;
enc << "seek" << ++_send_req_id << nullptr << seekMS * 1.0;
sendRequest(MSG_CMD, enc.data());
addOnStatusCB([this,seekMS](AMFValue &val) {
addOnStatusCB([this, seekMS](AMFValue &val) {
//TraceL << "seek result";
_aNowStampTicker[0].resetTime();
_aNowStampTicker[1].resetTime();
_now_stamp_ticker[0].resetTime();
_now_stamp_ticker[1].resetTime();
int iTimeInc = seekMS - getProgressMilliSecond();
for(auto i = 0 ;i < 2 ;i++){
_aiFistStamp[i] = _aiNowStamp[i] + iTimeInc;
_aiNowStamp[i] = _aiFistStamp[i];
for (auto i = 0; i < 2; i++) {
_fist_stamp[i] = _now_stamp[i] + iTimeInc;
_now_stamp[i] = _fist_stamp[i];
}
_iSeekTo = seekMS;
_seek_ms = seekMS;
});
}
} /* namespace mediakit */