整理代码

This commit is contained in:
xiongziliang
2020-08-30 11:40:03 +08:00
parent fbd711a6bb
commit a7e99b9d37
10 changed files with 438 additions and 419 deletions

View File

@@ -18,141 +18,147 @@ using namespace mediakit::Client;
namespace mediakit {
RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller,const RtmpMediaSource::Ptr &src) : TcpClient(poller){
_pMediaSrc=src;
RtmpPusher::RtmpPusher(const EventPoller::Ptr &poller, const RtmpMediaSource::Ptr &src) : TcpClient(poller){
_publish_src = src;
}
RtmpPusher::~RtmpPusher() {
teardown();
DebugL << endl;
}
void RtmpPusher::teardown() {
if (alive()) {
_strApp.clear();
_strStream.clear();
_strTcUrl.clear();
_app.clear();
_stream_id.clear();
_tc_url.clear();
{
lock_guard<recursive_mutex> lck(_mtxOnResultCB);
_mapOnResultCB.clear();
lock_guard<recursive_mutex> lck(_mtx_on_result);
_map_on_result.clear();
}
{
lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
_dqOnStatusCB.clear();
lock_guard<recursive_mutex> lck(_mtx_on_status);
_deque_on_status.clear();
}
_pPublishTimer.reset();
_publish_timer.reset();
reset();
shutdown(SockException(Err_shutdown,"teardown"));
shutdown(SockException(Err_shutdown, "teardown"));
}
}
void RtmpPusher::onPublishResult(const SockException &ex,bool handshakeCompleted) {
if(!handshakeCompleted){
void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) {
if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调
return;
}
if (!handshake_done) {
//播放结果回调
_pPublishTimer.reset();
if(_onPublished){
_onPublished(ex);
_publish_timer.reset();
if (_on_published) {
_on_published(ex);
}
} else {
//播放成功后异常断开回调
if(_onShutdown){
_onShutdown(ex);
if (_on_shutdown) {
_on_shutdown(ex);
}
}
if(ex){
if (ex) {
teardown();
}
}
void RtmpPusher::publish(const string &strUrl) {
void RtmpPusher::publish(const string &url) {
teardown();
string strHost = FindField(strUrl.data(), "://", "/");
_strApp = FindField(strUrl.data(), (strHost + "/").data(), "/");
_strStream = FindField(strUrl.data(), (strHost + "/" + _strApp + "/").data(), NULL);
_strTcUrl = string("rtmp://") + strHost + "/" + _strApp;
string host_url = FindField(url.data(), "://", "/");
_app = FindField(url.data(), (host_url + "/").data(), "/");
_stream_id = FindField(url.data(), (host_url + "/" + _app + "/").data(), NULL);
_tc_url = string("rtmp://") + host_url + "/" + _app;
if (!_strApp.size() || !_strStream.size()) {
onPublishResult(SockException(Err_other,"rtmp url非法"),false);
if (!_app.size() || !_stream_id.size()) {
onPublishResult(SockException(Err_other, "rtmp url非法"), false);
return;
}
DebugL << strHost << " " << _strApp << " " << _strStream;
DebugL << host_url << " " << _app << " " << _stream_id;
auto iPort = atoi(FindField(strHost.data(), ":", NULL).data());
auto iPort = atoi(FindField(host_url.data(), ":", NULL).data());
if (iPort <= 0) {
//rtmp 默认端口1935
iPort = 1935;
} else {
//服务器域名
strHost = FindField(strHost.data(), NULL, ":");
host_url = FindField(host_url.data(), NULL, ":");
}
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
float publishTimeOutSec = (*this)[kTimeoutMS].as<int>() / 1000.0;
_pPublishTimer.reset( new Timer(publishTimeOutSec, [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
_publish_timer.reset(new Timer(publishTimeOutSec, [weakSelf]() {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
return false;
}
strongSelf->onPublishResult(SockException(Err_timeout,"publish rtmp timeout"), false);
strongSelf->onPublishResult(SockException(Err_timeout, "publish rtmp timeout"), false);
return false;
},getPoller()));
}, getPoller()));
if(!(*this)[kNetAdapter].empty()){
if (!(*this)[kNetAdapter].empty()) {
setNetAdapter((*this)[kNetAdapter]);
}
startConnect(strHost, iPort);
startConnect(host_url, iPort);
}
void RtmpPusher::onErr(const SockException &ex){
//定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex,!_pPublishTimer);
onPublishResult(ex, !_publish_timer);
}
void RtmpPusher::onConnect(const SockException &err){
if(err) {
onPublishResult(err,false);
if (err) {
onPublishResult(err, false);
return;
}
//推流器不需要多大的接收缓存,节省内存占用
_sock->setReadBuffer(std::make_shared<BufferRaw>(1 * 1024));
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
startClientSession([weakSelf](){
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
weak_ptr<RtmpPusher> weak_self = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
startClientSession([weak_self]() {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
strongSelf->sendChunkSize(60000);
strongSelf->send_connect();
strong_self->sendChunkSize(60000);
strong_self->send_connect();
});
}
void RtmpPusher::onRecv(const Buffer::Ptr &pBuf){
void RtmpPusher::onRecv(const Buffer::Ptr &buf){
try {
onParseRtmp(pBuf->data(), pBuf->size());
onParseRtmp(buf->data(), buf->size());
} catch (exception &e) {
SockException ex(Err_other, e.what());
//定时器_pPublishTimer为空后表明握手结束了
onPublishResult(ex,!_pPublishTimer);
onPublishResult(ex, !_publish_timer);
}
}
inline void RtmpPusher::send_connect() {
AMFValue obj(AMF_OBJECT);
obj.set("app", _strApp);
obj.set("app", _app);
obj.set("type", "nonprivate");
obj.set("tcUrl", _strTcUrl);
obj.set("swfUrl", _strTcUrl);
obj.set("tcUrl", _tc_url);
obj.set("swfUrl", _tc_url);
sendInvoke("connect", obj);
addOnResultCB([this](AMFDecoder &dec){
addOnResultCB([this](AMFDecoder &dec) {
//TraceL << "connect result";
dec.load<AMFValue>();
auto val = dec.load<AMFValue>();
auto level = val["level"].as_string();
auto code = val["code"].as_string();
if(level != "status"){
throw std::runtime_error(StrPrinter <<"connect 失败:" << level << " " << code << endl);
if (level != "status") {
throw std::runtime_error(StrPrinter << "connect 失败:" << level << " " << code << endl);
}
send_createStream();
});
@@ -161,23 +167,24 @@ inline void RtmpPusher::send_connect() {
inline void RtmpPusher::send_createStream() {
AMFValue obj(AMF_NULL);
sendInvoke("createStream", obj);
addOnResultCB([this](AMFDecoder &dec){
addOnResultCB([this](AMFDecoder &dec) {
//TraceL << "createStream result";
dec.load<AMFValue>();
_stream_index = dec.load<int>();
send_publish();
});
}
inline void RtmpPusher::send_publish() {
AMFEncoder enc;
enc << "publish" << ++_send_req_id << nullptr << _strStream << _strApp ;
enc << "publish" << ++_send_req_id << nullptr << _stream_id << _app;
sendRequest(MSG_CMD, enc.data());
addOnStatusCB([this](AMFValue &val) {
auto level = val["level"].as_string();
auto code = val["code"].as_string();
if(level != "status") {
throw std::runtime_error(StrPrinter <<"publish 失败:" << level << " " << code << endl);
if (level != "status") {
throw std::runtime_error(StrPrinter << "publish 失败:" << level << " " << code << endl);
}
//start send media
send_metaData();
@@ -185,51 +192,51 @@ inline void RtmpPusher::send_publish() {
}
inline void RtmpPusher::send_metaData(){
auto src = _pMediaSrc.lock();
auto src = _publish_src.lock();
if (!src) {
throw std::runtime_error("the media source was released");
}
AMFEncoder enc;
enc << "@setDataFrame" << "onMetaData" << src->getMetaData();
enc << "@setDataFrame" << "onMetaData" << src->getMetaData();
sendRequest(MSG_DATA, enc.data());
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt){
sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id );
src->getConfigFrame([&](const RtmpPacket::Ptr &pkt) {
sendRtmp(pkt->type_id, _stream_index, pkt, pkt->time_stamp, pkt->chunk_id);
});
_pRtmpReader = src->getRing()->attach(getPoller());
weak_ptr<RtmpPusher> weakSelf = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
_pRtmpReader->setReadCB([weakSelf](const RtmpMediaSource::RingDataType &pkt){
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
_rtmp_reader = src->getRing()->attach(getPoller());
weak_ptr<RtmpPusher> weak_self = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
_rtmp_reader->setReadCB([weak_self](const RtmpMediaSource::RingDataType &pkt) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
int i = 0;
int size = pkt->size();
strongSelf->setSendFlushFlag(false);
pkt->for_each([&](const RtmpPacket::Ptr &rtmp){
if(++i == size){
strongSelf->setSendFlushFlag(true);
strong_self->setSendFlushFlag(false);
pkt->for_each([&](const RtmpPacket::Ptr &rtmp) {
if (++i == size) {
strong_self->setSendFlushFlag(true);
}
strongSelf->sendRtmp(rtmp->type_id, strongSelf->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id);
strong_self->sendRtmp(rtmp->type_id, strong_self->_stream_index, rtmp, rtmp->time_stamp, rtmp->chunk_id);
});
});
_pRtmpReader->setDetachCB([weakSelf](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
strongSelf->onPublishResult(SockException(Err_other,"媒体源被释放"), !strongSelf->_pPublishTimer);
_rtmp_reader->setDetachCB([weak_self]() {
auto strong_self = weak_self.lock();
if (strong_self) {
strong_self->onPublishResult(SockException(Err_other, "媒体源被释放"), !strong_self->_publish_timer);
}
});
onPublishResult(SockException(Err_success,"success"), false);
onPublishResult(SockException(Err_success, "success"), false);
//提升发送性能
setSocketFlags();
}
void RtmpPusher::setSocketFlags(){
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
if(mergeWriteMS > 0) {
if (mergeWriteMS > 0) {
//提高发送性能
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
@@ -237,70 +244,72 @@ void RtmpPusher::setSocketFlags(){
}
void RtmpPusher::onCmd_result(AMFDecoder &dec){
auto iReqId = dec.load<int>();
lock_guard<recursive_mutex> lck(_mtxOnResultCB);
auto it = _mapOnResultCB.find(iReqId);
if(it != _mapOnResultCB.end()){
auto req_id = dec.load<int>();
lock_guard<recursive_mutex> lck(_mtx_on_result);
auto it = _map_on_result.find(req_id);
if (it != _map_on_result.end()) {
it->second(dec);
_mapOnResultCB.erase(it);
}else{
_map_on_result.erase(it);
} else {
WarnL << "unhandled _result";
}
}
void RtmpPusher::onCmd_onStatus(AMFDecoder &dec) {
AMFValue val;
while(true){
while (true) {
val = dec.load<AMFValue>();
if(val.type() == AMF_OBJECT){
if (val.type() == AMF_OBJECT) {
break;
}
}
if(val.type() != AMF_OBJECT){
if (val.type() != AMF_OBJECT) {
throw std::runtime_error("onStatus:the result object was not found");
}
lock_guard<recursive_mutex> lck(_mtxOnStatusCB);
if(_dqOnStatusCB.size()){
_dqOnStatusCB.front()(val);
_dqOnStatusCB.pop_front();
}else{
lock_guard<recursive_mutex> lck(_mtx_on_status);
if (_deque_on_status.size()) {
_deque_on_status.front()(val);
_deque_on_status.pop_front();
} else {
auto level = val["level"];
auto code = val["code"].as_string();
if(level.type() == AMF_STRING){
if(level.as_string() != "status"){
throw std::runtime_error(StrPrinter <<"onStatus 失败:" << level.as_string() << " " << code << endl);
if (level.type() == AMF_STRING) {
if (level.as_string() != "status") {
throw std::runtime_error(StrPrinter << "onStatus 失败:" << level.as_string() << " " << code << endl);
}
}
}
}
void RtmpPusher::onRtmpChunk(RtmpPacket &chunkData) {
switch (chunkData.type_id) {
void RtmpPusher::onRtmpChunk(RtmpPacket &chunk_data) {
switch (chunk_data.type_id) {
case MSG_CMD:
case MSG_CMD3: {
typedef void (RtmpPusher::*rtmpCMDHandle)(AMFDecoder &dec);
static unordered_map<string, rtmpCMDHandle> g_mapCmd;
static onceToken token([]() {
g_mapCmd.emplace("_error",&RtmpPusher::onCmd_result);
g_mapCmd.emplace("_result",&RtmpPusher::onCmd_result);
g_mapCmd.emplace("onStatus",&RtmpPusher::onCmd_onStatus);
g_mapCmd.emplace("_error", &RtmpPusher::onCmd_result);
g_mapCmd.emplace("_result", &RtmpPusher::onCmd_result);
g_mapCmd.emplace("onStatus", &RtmpPusher::onCmd_onStatus);
});
AMFDecoder dec(chunkData.buffer, 0);
AMFDecoder dec(chunk_data.buffer, 0);
std::string type = dec.load<std::string>();
auto it = g_mapCmd.find(type);
if(it != g_mapCmd.end()){
if (it != g_mapCmd.end()) {
auto fun = it->second;
(this->*fun)(dec);
}else{
} else {
WarnL << "can not support cmd:" << type;
}
}
break;
}
default:
//WarnL << "unhandled message:" << (int) chunkData.type_id << hexdump(chunkData.buffer.data(), chunkData.buffer.size());
//WarnL << "unhandled message:" << (int) chunk_data.type_id << hexdump(chunk_data.buffer.data(), chunk_data.buffer.size());
break;
}
}
}