mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2026-06-21 08:22:21 +08:00
rtp级联(ps/ts/es)新增支持gop缓存功能 (#2395)
该修改主要解决rtp级联(调用startSendRtp接口)未做gop缓存导致上级无法秒开的问题。 同时通过RingBuffer对象线程隔离的特性,实现了在断连续推场景下归属线程切换导致的线程安全问题。 用户如未使用rtp级联功能,请修改配置文件关闭GOP缓存(rtp_proxy.gop_cache=0)以便节省内存。 --------- Co-authored-by: 夏楚 <771730766@qq.com>
This commit is contained in:
@@ -21,6 +21,17 @@ namespace toolkit {
|
||||
|
||||
namespace mediakit {
|
||||
|
||||
namespace {
|
||||
class MediaSourceForMuxer : public MediaSource {
|
||||
public:
|
||||
MediaSourceForMuxer(const MultiMediaSourceMuxer::Ptr &muxer)
|
||||
: MediaSource("muxer", muxer->getVhost(), muxer->getApp(), muxer->getStreamId()) {
|
||||
MediaSource::setListener(muxer);
|
||||
}
|
||||
int readerCount() override { return 0; }
|
||||
};
|
||||
} // namespace
|
||||
|
||||
static std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, const vector<Track::Ptr> &tracks, Recorder::type type, const ProtocolOption &option){
|
||||
auto recorder = Recorder::createRecorder(type, sender.getVhost(), sender.getApp(), sender.getId(), option);
|
||||
for (auto &track : tracks) {
|
||||
@@ -144,20 +155,15 @@ void MultiMediaSourceMuxer::setTrackListener(const std::weak_ptr<Listener> &list
|
||||
|
||||
int MultiMediaSourceMuxer::totalReaderCount() const {
|
||||
auto hls = _hls;
|
||||
auto ret = (_rtsp ? _rtsp->readerCount() : 0) +
|
||||
(_rtmp ? _rtmp->readerCount() : 0) +
|
||||
(_ts ? _ts->readerCount() : 0) +
|
||||
#if defined(ENABLE_MP4)
|
||||
(_fmp4 ? _fmp4->readerCount() : 0) +
|
||||
#endif
|
||||
(_mp4 ? _option.mp4_as_player : 0) +
|
||||
(hls ? hls->readerCount() : 0);
|
||||
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
return ret + (int)_rtp_sender.size();
|
||||
#else
|
||||
return ret;
|
||||
#endif
|
||||
return (_rtsp ? _rtsp->readerCount() : 0) +
|
||||
(_rtmp ? _rtmp->readerCount() : 0) +
|
||||
(_ts ? _ts->readerCount() : 0) +
|
||||
#if defined(ENABLE_MP4)
|
||||
(_fmp4 ? _fmp4->readerCount() : 0) +
|
||||
#endif
|
||||
(_mp4 ? _option.mp4_as_player : 0) +
|
||||
(hls ? hls->readerCount() : 0) +
|
||||
(_ring ? _ring->readerCount() : 0);
|
||||
}
|
||||
|
||||
void MultiMediaSourceMuxer::setTimeStamp(uint32_t stamp) {
|
||||
@@ -237,42 +243,45 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type
|
||||
|
||||
void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceEvent::SendRtpArgs &args, const std::function<void(uint16_t, const toolkit::SockException &)> cb) {
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
createGopCacheIfNeed();
|
||||
|
||||
auto ring = _ring;
|
||||
auto ssrc = args.ssrc;
|
||||
auto tracks = getTracks(false);
|
||||
auto rtp_sender = std::make_shared<RtpSender>(getOwnerPoller(sender));
|
||||
weak_ptr<MediaSource> weak_sender = sender.shared_from_this();
|
||||
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
|
||||
rtp_sender->startSend(args, [args, weak_self, rtp_sender, cb, weak_sender](uint16_t local_port, const SockException &ex) mutable {
|
||||
|
||||
rtp_sender->startSend(args, [ssrc, weak_self, rtp_sender, cb, tracks, ring](uint16_t local_port, const SockException &ex) mutable {
|
||||
cb(local_port, ex);
|
||||
auto strong_self = weak_self.lock();
|
||||
if (!strong_self || ex) {
|
||||
return;
|
||||
}
|
||||
if (!strong_self->getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread()) {
|
||||
// poller线程发生变更了
|
||||
return;
|
||||
}
|
||||
for (auto &track : strong_self->getTracks(false)) {
|
||||
|
||||
for (auto &track : tracks) {
|
||||
rtp_sender->addTrack(track);
|
||||
}
|
||||
rtp_sender->addTrackCompleted();
|
||||
|
||||
auto ssrc = args.ssrc;
|
||||
rtp_sender->setOnClose([weak_self, ssrc, weak_sender](const toolkit::SockException &ex) {
|
||||
rtp_sender->setOnClose([weak_self, ssrc](const toolkit::SockException &ex) {
|
||||
if (auto strong_self = weak_self.lock()) {
|
||||
WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex.what();
|
||||
strong_self->_rtp_sender.erase(ssrc);
|
||||
//触发观看人数统计
|
||||
auto strong_sender = weak_sender.lock();
|
||||
if (strong_sender) {
|
||||
strong_self->onReaderChanged(*strong_sender, strong_self->totalReaderCount());
|
||||
}
|
||||
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex);
|
||||
// 可能归属线程发生变更
|
||||
strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() {
|
||||
WarnL << "stream:" << strong_self->shortUrl() << " stop send rtp:" << ssrc << ", reason:" << ex.what();
|
||||
strong_self->_rtp_sender.erase(ssrc);
|
||||
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastSendRtpStopped, *strong_self, ssrc, ex);
|
||||
});
|
||||
}
|
||||
});
|
||||
strong_self->_rtp_sender[args.ssrc] = std::move(rtp_sender);
|
||||
auto strong_sender = weak_sender.lock();
|
||||
if (strong_sender) {
|
||||
strong_self->onReaderChanged(*strong_sender, strong_self->totalReaderCount());
|
||||
}
|
||||
|
||||
auto reader = ring->attach(EventPoller::getCurrentPoller());
|
||||
reader->setReadCB([rtp_sender](const Frame::Ptr &frame) {
|
||||
rtp_sender->inputFrame(frame);
|
||||
});
|
||||
|
||||
// 可能归属线程发生变更
|
||||
strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() {
|
||||
strong_self->_rtp_sender[ssrc] = std::move(reader);
|
||||
});
|
||||
});
|
||||
#else
|
||||
cb(0, SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏"));
|
||||
@@ -281,10 +290,6 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const MediaSourceE
|
||||
|
||||
bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string &ssrc) {
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
onceToken token(nullptr, [&]() {
|
||||
//关闭rtp推流,可能触发无人观看事件
|
||||
onReaderChanged(sender, totalReaderCount());
|
||||
});
|
||||
if (ssrc.empty()) {
|
||||
//关闭全部
|
||||
auto size = _rtp_sender.size();
|
||||
@@ -369,9 +374,33 @@ void MultiMediaSourceMuxer::onAllTrackReady() {
|
||||
if (listener) {
|
||||
listener->onAllTrackReady();
|
||||
}
|
||||
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
GET_CONFIG(bool, gop_cache, RtpProxy::kGopCache);
|
||||
if (gop_cache) {
|
||||
createGopCacheIfNeed();
|
||||
}
|
||||
#endif
|
||||
InfoL << "stream: " << shortUrl() << " , codec info: " << getTrackInfoStr(this);
|
||||
}
|
||||
|
||||
void MultiMediaSourceMuxer::createGopCacheIfNeed() {
|
||||
if (_ring) {
|
||||
return;
|
||||
}
|
||||
weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
|
||||
_ring = std::make_shared<RingType>(1024, [weak_self](int size) {
|
||||
auto strong_self = weak_self.lock();
|
||||
if (strong_self) {
|
||||
// 切换到归属线程
|
||||
strong_self->getOwnerPoller(MediaSource::NullMediaSource())->async([=]() {
|
||||
auto src = std::make_shared<MediaSourceForMuxer>(strong_self);
|
||||
strong_self->onReaderChanged(*src, strong_self->totalReaderCount());
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
void MultiMediaSourceMuxer::resetTracks() {
|
||||
MediaSink::resetTracks();
|
||||
|
||||
@@ -390,12 +419,6 @@ void MultiMediaSourceMuxer::resetTracks() {
|
||||
}
|
||||
#endif
|
||||
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
for (auto &pr : _rtp_sender) {
|
||||
pr.second->resetTracks();
|
||||
}
|
||||
#endif
|
||||
|
||||
//拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题
|
||||
auto hls = _hls;
|
||||
if (hls) {
|
||||
@@ -443,11 +466,17 @@ bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) {
|
||||
}
|
||||
#endif
|
||||
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
for (auto &pr : _rtp_sender) {
|
||||
ret = pr.second->inputFrame(frame) ? true : ret;
|
||||
if (_ring) {
|
||||
if (frame->getTrackType() == TrackVideo) {
|
||||
// 视频时,遇到第一帧配置帧或关键帧则标记为gop开始处
|
||||
auto video_key_pos = frame->keyFrame() || frame->configFrame();
|
||||
_ring->write(frame, video_key_pos && !_video_key_pos);
|
||||
_video_key_pos = video_key_pos;
|
||||
} else {
|
||||
// 没有视频时,设置is_key为true,目的是关闭gop缓存
|
||||
_ring->write(frame, !haveVideo());
|
||||
}
|
||||
}
|
||||
#endif //ENABLE_RTPPROXY
|
||||
return ret;
|
||||
}
|
||||
|
||||
@@ -457,19 +486,15 @@ bool MultiMediaSourceMuxer::isEnabled(){
|
||||
//无人观看时,每次检查是否真的无人观看
|
||||
//有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能)
|
||||
auto hls = _hls;
|
||||
auto flag = (_rtmp ? _rtmp->isEnabled() : false) ||
|
||||
(_rtsp ? _rtsp->isEnabled() : false) ||
|
||||
(_ts ? _ts->isEnabled() : false) ||
|
||||
#if defined(ENABLE_MP4)
|
||||
(_fmp4 ? _fmp4->isEnabled() : false) ||
|
||||
#endif
|
||||
(hls ? hls->isEnabled() : false) || _mp4;
|
||||
_is_enable = (_rtmp ? _rtmp->isEnabled() : false) ||
|
||||
(_rtsp ? _rtsp->isEnabled() : false) ||
|
||||
(_ts ? _ts->isEnabled() : false) ||
|
||||
#if defined(ENABLE_MP4)
|
||||
(_fmp4 ? _fmp4->isEnabled() : false) ||
|
||||
#endif
|
||||
(_ring ? (bool)_ring->readerCount() : false) ||
|
||||
(hls ? hls->isEnabled() : false) || _mp4;
|
||||
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
_is_enable = flag || _rtp_sender.size();
|
||||
#else
|
||||
_is_enable = flag;
|
||||
#endif //ENABLE_RTPPROXY
|
||||
if (_is_enable) {
|
||||
//无人观看时,不刷新计时器,因为无人观看时每次都会检查一遍,所以刷新计数器无意义且浪费cpu
|
||||
_last_check.resetTime();
|
||||
|
||||
Reference in New Issue
Block a user