rtsp/rtmp推流支持断连续推: #1240, #1300

This commit is contained in:
ziyue
2022-01-10 16:37:50 +08:00
parent 262af8dfeb
commit f5efd232a9
10 changed files with 240 additions and 130 deletions

View File

@@ -24,9 +24,9 @@ RtmpSession::~RtmpSession() {
}
void RtmpSession::onError(const SockException& err) {
bool isPlayer = !_publisher_src;
uint64_t duration = _ticker.createdTime()/1000;
WarnP(this) << (isPlayer ? "RTMP播放器(" : "RTMP推流器(")
bool is_player = !_push_src;
uint64_t duration = _ticker.createdTime() / 1000;
WarnP(this) << (is_player ? "RTMP播放器(" : "RTMP推流器(")
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
@@ -34,26 +34,35 @@ void RtmpSession::onError(const SockException& err) {
<< ",耗时(s):" << duration;
//流量统计事件广播
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
if(_total_bytes >= iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, isPlayer, static_cast<SockInfo &>(*this));
if (_total_bytes >= iFlowThreshold * 1024) {
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _media_info, _total_bytes, duration, is_player, static_cast<SockInfo &>(*this));
}
GET_CONFIG(uint32_t, continue_push_ms, General::kContinuePushMS);
if (_push_src && continue_push_ms) {
//取消所有权
_push_src_ownership = nullptr;
//延时10秒注销流
auto push_src = std::move(_push_src);
getPoller()->doDelayTask(continue_push_ms, [push_src]() { return 0; });
}
}
void RtmpSession::onManager() {
GET_CONFIG(uint32_t,handshake_sec,Rtmp::kHandshakeSecond);
GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);
GET_CONFIG(uint32_t, handshake_sec, Rtmp::kHandshakeSecond);
GET_CONFIG(uint32_t, keep_alive_sec, Rtmp::kKeepAliveSecond);
if (_ticker.createdTime() > handshake_sec * 1000) {
if (!_ring_reader && !_publisher_src) {
shutdown(SockException(Err_timeout,"illegal connection"));
if (!_ring_reader && !_push_src) {
shutdown(SockException(Err_timeout, "illegal connection"));
}
}
if (_publisher_src) {
//publisher
if (_push_src) {
// push
if (_ticker.elapsedTime() > keep_alive_sec * 1000) {
shutdown(SockException(Err_timeout,"recv data from rtmp pusher timeout"));
shutdown(SockException(Err_timeout, "recv data from rtmp pusher timeout"));
}
}
}
@@ -121,31 +130,61 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_media_info.parse(_tc_url + "/" + getStreamId(dec.load<std::string>()));
_media_info._schema = RTMP_SCHEMA;
auto on_res = [this,pToken](const string &err, bool enableHls, bool enableMP4){
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
_media_info._vhost,
_media_info._app,
_media_info._streamid));
bool auth_success = err.empty();
bool ok = (!src && !_publisher_src && auth_success);
AMFValue status(AMF_OBJECT);
status.set("level", ok ? "status" : "error");
status.set("code", ok ? "NetStream.Publish.Start" : (auth_success ? "NetStream.Publish.BadName" : "NetStream.Publish.BadAuth"));
status.set("description", ok ? "Started publishing stream." : (auth_success ? "Already publishing." : err.data()));
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
if (!ok) {
string errMsg = StrPrinter << (auth_success ? "already publishing:" : err.data()) << " "
<< _media_info._vhost << " "
<< _media_info._app << " "
<< _media_info._streamid;
shutdown(SockException(Err_shutdown,errMsg));
auto on_res = [this, pToken](const string &err, bool enableHls, bool enableMP4) {
if (!err.empty()) {
sendStatus({ "level", "error",
"code", "NetStream.Publish.BadAuth",
"description", err,
"clientid", "0" });
shutdown(SockException(Err_shutdown, StrPrinter << "Unauthorized:" << err));
return;
}
_publisher_src.reset(new RtmpMediaSourceImp(_media_info._vhost, _media_info._app, _media_info._streamid));
_publisher_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
//设置转协议
_publisher_src->setProtocolTranslation(enableHls, enableMP4);
assert(!_push_src);
auto src = MediaSource::find(RTMP_SCHEMA, _media_info._vhost, _media_info._app, _media_info._streamid);
auto push_failed = (bool)src;
while (src) {
//尝试断连后继续推流
auto rtmp_src = dynamic_pointer_cast<RtmpMediaSourceImp>(src);
if (!rtmp_src) {
//源不是rtmp推流产生的
break;
}
auto ownership = rtmp_src->getOwnership();
if (!ownership) {
//获取推流源所有权失败
break;
}
_push_src = std::move(rtmp_src);
_push_src_ownership = std::move(ownership);
push_failed = false;
break;
}
if (push_failed) {
sendStatus({"level", "error",
"code", "NetStream.Publish.BadName",
"description", "Already publishing.",
"clientid", "0" });
shutdown(SockException(Err_shutdown, StrPrinter << "Already publishing:" << err));
return;
}
if (!_push_src) {
_push_src = std::make_shared<RtmpMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
//获取所有权
_push_src_ownership = _push_src->getOwnership();
_push_src->setProtocolTranslation(enableHls, enableMP4);
}
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
sendStatus({"level", "status",
"code", "NetStream.Publish.Start",
"description", "Started publishing stream.",
"clientid", "0" });
setSocketFlags();
};
@@ -178,15 +217,27 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
}
void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
AMFValue status(AMF_OBJECT);
status.set("level", "status");
status.set("code", "NetStream.Unpublish.Success");
status.set("description", "Stop publishing.");
sendReply("onStatus", nullptr, status);
sendStatus({ "level", "status",
"code", "NetStream.Unpublish.Success",
"description", "Stop publishing." });
throw std::runtime_error(StrPrinter << "Stop publishing" << endl);
}
void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr &src){
void RtmpSession::sendStatus(const std::initializer_list<string> &key_value) {
AMFValue status(AMF_OBJECT);
int i = 0;
string key;
for (auto &val : key_value) {
if (++i % 2 == 0) {
status.set(key, val);
} else {
key = val;
}
}
sendReply("onStatus", nullptr, status);
}
void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr &src) {
bool auth_success = err.empty();
bool ok = (src.operator bool() && auth_success);
if (ok) {
@@ -194,13 +245,12 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA);
}
// onStatus(NetStream.Play.Reset)
AMFValue status(AMF_OBJECT);
status.set("level", ok ? "status" : "error");
status.set("code", ok ? "NetStream.Play.Reset" : (auth_success ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth"));
status.set("description", ok ? "Resetting and playing." : (auth_success ? "No such stream." : err.data()));
status.set("details", _media_info._streamid);
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
sendStatus({ "level", (ok ? "status" : "error"),
"code", (ok ? "NetStream.Play.Reset" : (auth_success ? "NetStream.Play.StreamNotFound" : "NetStream.Play.BadAuth")),
"description", (ok ? "Resetting and playing." : (auth_success ? "No such stream." : err.data())),
"details", _media_info._streamid,
"clientid", "0" });
if (!ok) {
string err_msg = StrPrinter << (auth_success ? "no such stream:" : err.data()) << " "
<< _media_info._vhost << " "
@@ -211,13 +261,12 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
}
// onStatus(NetStream.Play.Start)
status.clear();
status.set("level", "status");
status.set("code", "NetStream.Play.Start");
status.set("description", "Started playing.");
status.set("details", _media_info._streamid);
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
sendStatus({ "level", "status",
"code", "NetStream.Play.Start",
"description", "Started playing." ,
"details", _media_info._streamid,
"clientid", "0"});
// |RtmpSampleAccess(true, true)
AMFEncoder invoke;
@@ -232,13 +281,11 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
sendResponse(MSG_DATA, invoke.data());
//onStatus(NetStream.Play.PublishNotify)
status.clear();
status.set("level", "status");
status.set("code", "NetStream.Play.PublishNotify");
status.set("description", "Now published.");
status.set("details", _media_info._streamid);
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
sendStatus({ "level", "status",
"code", "NetStream.Play.PublishNotify",
"description", "Now published." ,
"details", _media_info._streamid,
"clientid", "0"});
auto &metadata = src->getMetaData();
if(metadata){
@@ -280,7 +327,7 @@ void RtmpSession::sendPlayResponse(const string &err,const RtmpMediaSource::Ptr
strongSelf->shutdown(SockException(Err_shutdown,"rtmp ring buffer detached"));
});
src->pause(false);
_player_src = src;
_play_src = src;
//提高服务器发送性能
setSocketFlags();
}
@@ -386,14 +433,14 @@ void RtmpSession::onCmd_pause(AMFDecoder &dec) {
dec.load<AMFValue>();/* NULL */
bool paused = dec.load<bool>();
TraceP(this) << paused;
AMFValue status(AMF_OBJECT);
status.set("level", "status");
status.set("code", paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify");
status.set("description", paused ? "Paused stream." : "Unpaused stream.");
sendReply("onStatus", nullptr, status);
sendStatus({ "level", "status",
"code", (paused ? "NetStream.Pause.Notify" : "NetStream.Unpause.Notify"),
"description", (paused ? "Paused stream." : "Unpaused stream.")});
//streamBegin
sendUserControl(paused ? CONTROL_STREAM_EOF : CONTROL_STREAM_BEGIN, STREAM_MEDIA);
auto strongSrc = _player_src.lock();
auto strongSrc = _play_src.lock();
if (strongSrc) {
strongSrc->pause(paused);
}
@@ -405,17 +452,16 @@ void RtmpSession::onCmd_playCtrl(AMFDecoder &dec) {
int ctrlType = ctrlObj["ctrlType"].as_integer();
float speed = ctrlObj["speed"].as_number();
AMFValue status(AMF_OBJECT);
status.set("level", "status");
status.set("code", "NetStream.Speed.Notify");
status.set("description", "Speeding");
sendReply("onStatus", nullptr, status);
sendStatus({ "level", "status",
"code", "NetStream.Speed.Notify",
"description", "Speeding"});
//streamBegin
sendUserControl(CONTROL_STREAM_EOF, STREAM_MEDIA);
auto stongSrc = _player_src.lock();
if (stongSrc) {
stongSrc->speed(speed);
auto strong_src = _play_src.lock();
if (strong_src) {
strong_src->speed(speed);
}
}
@@ -424,7 +470,7 @@ void RtmpSession::setMetaData(AMFDecoder &dec) {
if (type != "onMetaData") {
throw std::runtime_error("can only set metadata");
}
_publisher_metadata = dec.load<AMFValue>();
_push_metadata = dec.load<AMFValue>();
}
void RtmpSession::onProcessCmd(AMFDecoder &dec) {
@@ -471,7 +517,7 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) {
setMetaData(dec);
} else if (type == "onMetaData") {
//兼容某些不规范的推流器
_publisher_metadata = dec.load<AMFValue>();
_push_metadata = dec.load<AMFValue>();
} else {
TraceP(this) << "unknown notify:" << type;
}
@@ -480,8 +526,8 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) {
case MSG_AUDIO:
case MSG_VIDEO: {
if (!_publisher_src) {
WarnL << "Not a rtmp publisher!";
if (!_push_src) {
WarnL << "Not a rtmp push!";
return;
}
GET_CONFIG(bool, rtmp_modify_stamp, Rtmp::kModifyStamp);
@@ -493,9 +539,9 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) {
if (!_set_meta_data) {
_set_meta_data = true;
_publisher_src->setMetaData(_publisher_metadata ? _publisher_metadata : TitleMeta().getMetadata());
_push_src->setMetaData(_push_metadata ? _push_metadata : TitleMeta().getMetadata());
}
_publisher_src->onWrite(std::move(packet));
_push_src->onWrite(std::move(packet));
break;
}
@@ -507,15 +553,13 @@ void RtmpSession::onRtmpChunk(RtmpPacket::Ptr packet) {
void RtmpSession::onCmd_seek(AMFDecoder &dec) {
dec.load<AMFValue>();/* NULL */
AMFValue status(AMF_OBJECT);
status.set("level", "status");
status.set("code", "NetStream.Seek.Notify");
status.set("description", "Seeking.");
sendReply("onStatus", nullptr, status);
sendStatus({ "level", "status",
"code", "NetStream.Seek.Notify",
"description", "Seeking."});
auto milliSeconds = (uint32_t)(dec.load<AMFValue>().as_number());
InfoP(this) << "rtmp seekTo(ms):" << milliSeconds;
auto strong_src = _player_src.lock();
auto strong_src = _play_src.lock();
if (strong_src) {
strong_src->seekTo(milliSeconds);
}
@@ -527,7 +571,7 @@ void RtmpSession::onSendMedia(const RtmpPacket::Ptr &pkt) {
bool RtmpSession::close(MediaSource &sender,bool force) {
//此回调在其他线程触发
if(!_publisher_src || (!force && _publisher_src->totalReaderCount())){
if(!_push_src || (!force && _push_src->totalReaderCount())){
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
@@ -536,7 +580,7 @@ bool RtmpSession::close(MediaSource &sender,bool force) {
}
int RtmpSession::totalReaderCount(MediaSource &sender) {
return _publisher_src ? _publisher_src->totalReaderCount() : sender.readerCount();
return _push_src ? _push_src->totalReaderCount() : sender.readerCount();
}
MediaOriginType RtmpSession::getOriginType(MediaSource &sender) const{