diff --git a/3rdpart/media-server b/3rdpart/media-server index 66033ee4..698840bf 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 66033ee4750a147a76ed5ff7785385e713d45926 +Subproject commit 698840bf5e4d94b14e5629a6acee6f9a039bbd5c diff --git a/README.md b/README.md index 087c807c..1d12175d 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ - 支持UDP/TCP国标RTP(PS或TS)推流服务器,可以转换成RTSP/RTMP/HLS等协议 - 支持RTSP/RTMP/HLS转国标推流客户端,支持TCP/UDP模式,提供相应restful api - 支持H264/H265/AAC/G711/OPUS编码 + - 支持海康ehome推流 - MP4点播与录制 - 支持录制为FLV/HLS/MP4 diff --git a/server/FFmpegSource.cpp b/server/FFmpegSource.cpp index 35856aaa..6ae12e85 100644 --- a/server/FFmpegSource.cpp +++ b/server/FFmpegSource.cpp @@ -233,7 +233,7 @@ void FFmpegSource::setOnClose(const function &cb){ } bool FFmpegSource::close(MediaSource &sender, bool force) { - auto listener = _listener.lock(); + auto listener = getDelegate(); if(listener && !listener->close(sender,force)){ //关闭失败 return false; @@ -258,17 +258,11 @@ std::shared_ptr FFmpegSource::getOriginSock(MediaSource &sender) const } void FFmpegSource::onGetMediaSource(const MediaSource::Ptr &src) { - auto listener = src->getListener(); + auto listener = src->getListener(true); if (listener.lock().get() != this) { - //防止多次进入onGetMediaSource函数导致无效递归调用的bug - _listener = listener; + //防止多次进入onGetMediaSource函数导致无限递归调用的bug + setDelegate(listener); src->setListener(shared_from_this()); - } else { - WarnL << "多次触发onGetMediaSource事件:" - << src->getSchema() << "/" - << src->getVhost() << "/" - << src->getApp() << "/" - << src->getId(); } } diff --git a/server/WebApi.cpp b/server/WebApi.cpp index 14ea493e..aa67c1ad 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -504,12 +504,12 @@ void installWebApi() { allArgs["vhost"], allArgs["app"], allArgs["stream"]); - if(src){ + if (src) { bool flag = src->close(allArgs["force"].as()); val["result"] = flag ? 0 : -1; val["msg"] = flag ? "success" : "close failed"; - val["code"] = API::OtherFailed; - }else{ + val["code"] = flag ? API::Success : API::OtherFailed; + } else { val["result"] = -2; val["msg"] = "can not find the stream"; val["code"] = API::OtherFailed; @@ -1309,6 +1309,8 @@ void installWebApi() { } void unInstallWebApi(){ + RtpSelector::Instance().clear(); + { lock_guard lck(s_proxyMapMtx); s_proxyMap.clear(); @@ -1318,6 +1320,7 @@ void unInstallWebApi(){ lock_guard lck(s_ffmpegMapMtx); s_ffmpegMap.clear(); } + { #if defined(ENABLE_RTPPROXY) lock_guard lck(s_rtpServerMapMtx); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index d548a7d1..2d6b04ab 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -20,16 +20,16 @@ recursive_mutex s_media_source_mtx; MediaSource::SchemaVhostAppStreamMap s_media_source_map; string getOriginTypeString(MediaOriginType type){ -#define SWITCH_CASE(type) case type : return #type +#define SWITCH_CASE(type) case MediaOriginType::type : return #type switch (type) { - SWITCH_CASE(MediaOriginType::unknown); - SWITCH_CASE(MediaOriginType::rtmp_push); - SWITCH_CASE(MediaOriginType::rtsp_push); - SWITCH_CASE(MediaOriginType::rtp_push); - SWITCH_CASE(MediaOriginType::pull); - SWITCH_CASE(MediaOriginType::ffmpeg_pull); - SWITCH_CASE(MediaOriginType::mp4_vod); - SWITCH_CASE(MediaOriginType::device_chn); + SWITCH_CASE(unknown); + SWITCH_CASE(rtmp_push); + SWITCH_CASE(rtsp_push); + SWITCH_CASE(rtp_push); + SWITCH_CASE(pull); + SWITCH_CASE(ffmpeg_pull); + SWITCH_CASE(mp4_vod); + SWITCH_CASE(device_chn); } } @@ -92,8 +92,19 @@ void MediaSource::setListener(const std::weak_ptr &listener){ _listener = listener; } -const std::weak_ptr& MediaSource::getListener() const{ - return _listener; +std::weak_ptr MediaSource::getListener(bool next) const{ + if (!next) { + return _listener; + } + auto listener = dynamic_pointer_cast(_listener.lock()); + if (!listener) { + //不是MediaSourceEventInterceptor对象或者对象已经销毁 + return _listener; + } + //获取被拦截的对象 + auto next_obj = listener->getDelegate(); + //有则返回之 + return next_obj ? next_obj : _listener; } int MediaSource::totalReaderCount(){ @@ -168,7 +179,7 @@ bool MediaSource::isRecording(Recorder::type type){ return listener->isRecording(*this, type); } -void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ +void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ auto listener = _listener.lock(); if (!listener) { cb(SockException(Err_other, "尚未设置事件监听器")); @@ -627,7 +638,7 @@ vector MediaSourceEventInterceptor::getTracks(MediaSource &sender, b return listener->getTracks(sender, trackReady); } -void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ +void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ auto listener = _listener.lock(); if (listener) { listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); @@ -644,9 +655,20 @@ bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender){ return false; } +void MediaSourceEventInterceptor::setDelegate(const std::weak_ptr &listener) { + if (listener.lock().get() == this) { + throw std::invalid_argument("can not set self as a delegate"); + } + _listener = listener; +} + +std::shared_ptr MediaSourceEventInterceptor::getDelegate() const{ + return _listener.lock(); +} + /////////////////////////////////////FlushPolicy////////////////////////////////////// -static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size) { +static bool isFlushAble_default(bool is_video, uint64_t last_stamp, uint64_t new_stamp, int cache_size) { if (new_stamp + 500 < last_stamp) { //时间戳回退比较大(可能seek中),由于rtp中时间戳是pts,是可能存在一定程度的回退的 return true; @@ -656,7 +678,7 @@ static bool isFlushAble_default(bool is_video, uint32_t last_stamp, uint32_t new return last_stamp != new_stamp || cache_size >= 1024; } -static bool isFlushAble_merge(bool is_video, uint32_t last_stamp, uint32_t new_stamp, int cache_size, int merge_ms) { +static bool isFlushAble_merge(bool is_video, uint64_t last_stamp, uint64_t new_stamp, int cache_size, int merge_ms) { if (new_stamp + 500 < last_stamp) { //时间戳回退比较大(可能seek中),由于rtp中时间戳是pts,是可能存在一定程度的回退的 return true; @@ -672,7 +694,7 @@ static bool isFlushAble_merge(bool is_video, uint32_t last_stamp, uint32_t new_s return cache_size >= 1024; } -bool FlushPolicy::isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size) { +bool FlushPolicy::isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size) { bool flush_flag = false; if (is_key && is_video) { //遇到关键帧flush掉前面的数据,确保关键帧为该组数据的第一帧,确保GOP缓存有效 diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 44f8d3be..5df31f50 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -83,7 +83,7 @@ public: // 获取所有track相关信息 virtual vector getTracks(MediaSource &sender, bool trackReady = true) const { return vector(); }; // 开始发送ps-rtp - virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) { cb(SockException(Err_other, "not implemented"));}; + virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) { cb(SockException(Err_other, "not implemented"));}; // 停止发送ps-rtp virtual bool stopSendRtp(MediaSource &sender) {return false; } @@ -97,6 +97,9 @@ public: MediaSourceEventInterceptor(){} ~MediaSourceEventInterceptor() override {} + void setDelegate(const std::weak_ptr &listener); + std::shared_ptr getDelegate() const; + MediaOriginType getOriginType(MediaSource &sender) const override; string getOriginUrl(MediaSource &sender) const override; std::shared_ptr getOriginSock(MediaSource &sender) const override; @@ -109,10 +112,10 @@ public: bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool isRecording(MediaSource &sender, Recorder::type type) override; vector getTracks(MediaSource &sender, bool trackReady = true) const override; - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; bool stopSendRtp(MediaSource &sender) override; -protected: +private: std::weak_ptr _listener; }; @@ -226,9 +229,9 @@ public: ////////////////MediaSourceEvent相关接口实现//////////////// // 设置监听者 - void setListener(const std::weak_ptr &listener); + virtual void setListener(const std::weak_ptr &listener); // 获取监听者 - const std::weak_ptr& getListener() const; + std::weak_ptr getListener(bool next = false) const; // 本协议获取观看者个数,可能返回本协议的观看人数,也可能返回总人数 virtual int readerCount() = 0; @@ -253,7 +256,7 @@ public: // 获取录制状态 bool isRecording(Recorder::type type); // 开始发送ps-rtp - void startSendRtp(const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb); + void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb); // 停止发送ps-rtp bool stopSendRtp(); @@ -301,18 +304,10 @@ public: FlushPolicy() = default; ~FlushPolicy() = default; - uint32_t getStamp(const RtpPacket::Ptr &packet) { - return packet->timeStamp; - } - - uint32_t getStamp(const RtmpPacket::Ptr &packet) { - return packet->time_stamp; - } - - bool isFlushAble(bool is_video, bool is_key, uint32_t new_stamp, int cache_size); + bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, int cache_size); private: - uint32_t _last_stamp[2] = {0, 0}; + uint64_t _last_stamp[2] = {0, 0}; }; /// 合并写缓存模板 @@ -328,8 +323,8 @@ public: virtual ~PacketCache() = default; - void inputPacket(bool is_video, std::shared_ptr pkt, bool key_pos) { - if (_policy.isFlushAble(is_video, key_pos, _policy.getStamp(pkt), _cache->size())) { + void inputPacket(uint64_t stamp, bool is_video, std::shared_ptr pkt, bool key_pos) { + if (_policy.isFlushAble(is_video, key_pos, stamp, _cache->size())) { flushAll(); } diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index d610a550..5fb361c6 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -323,7 +323,7 @@ MultiMediaSourceMuxer::MultiMediaSourceMuxer(const string &vhost, const string & } void MultiMediaSourceMuxer::setMediaListener(const std::weak_ptr &listener) { - _listener = listener; + setDelegate(listener); //拦截事件 _muxer->setMediaListener(shared_from_this()); } @@ -345,7 +345,7 @@ vector MultiMediaSourceMuxer::getTracks(MediaSource &sender, bool tr } int MultiMediaSourceMuxer::totalReaderCount(MediaSource &sender) { - auto listener = _listener.lock(); + auto listener = getDelegate(); if (!listener) { return totalReaderCount(); } @@ -360,21 +360,21 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ #if defined(ENABLE_RTPPROXY) - auto ps_rtp_sender = std::make_shared(ssrc); + RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data())); weak_ptr weak_self = shared_from_this(); - ps_rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, ps_rtp_sender, cb](const SockException &ex) { + rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, rtp_sender, cb](const SockException &ex) { cb(ex); auto strong_self = weak_self.lock(); if (!strong_self || ex) { return; } for (auto &track : strong_self->_muxer->getTracks(false)) { - ps_rtp_sender->addTrack(track); + rtp_sender->addTrack(track); } - ps_rtp_sender->addTrackCompleted(); - strong_self->_ps_rtp_sender = ps_rtp_sender; + rtp_sender->addTrackCompleted(); + strong_self->_rtp_sender = rtp_sender; }); #else cb(SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); @@ -383,8 +383,8 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_ bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){ #if defined(ENABLE_RTPPROXY) - if (_ps_rtp_sender) { - _ps_rtp_sender = nullptr; + if (_rtp_sender) { + _rtp_sender = nullptr; return true; } #endif//ENABLE_RTPPROXY @@ -473,9 +473,9 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { _muxer->inputFrame(frame); #if defined(ENABLE_RTPPROXY) - auto ps_rtp_sender = _ps_rtp_sender; - if (ps_rtp_sender) { - ps_rtp_sender->inputFrame(frame); + auto rtp_sender = _rtp_sender; + if (rtp_sender) { + rtp_sender->inputFrame(frame); } #endif //ENABLE_RTPPROXY @@ -483,7 +483,7 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { bool MultiMediaSourceMuxer::isEnabled(){ #if defined(ENABLE_RTPPROXY) - return (_muxer->isEnabled() || _ps_rtp_sender); + return (_muxer->isEnabled() || _rtp_sender); #else return _muxer->isEnabled(); #endif //ENABLE_RTPPROXY diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index 42d1a965..c8c56402 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -12,7 +12,7 @@ #define ZLMEDIAKIT_MULTIMEDIASOURCEMUXER_H #include "Common/Stamp.h" -#include "Rtp/PSRtpSender.h" +#include "Rtp/RtpSender.h" #include "Record/Recorder.h" #include "Record/HlsRecorder.h" #include "Record/HlsMediaSource.h" @@ -143,7 +143,7 @@ public: * @param is_udp 是否为udp * @param cb 启动成功或失败回调 */ - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, uint32_t ssrc, bool is_udp, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; /** * 停止ps-rtp发送 @@ -188,7 +188,7 @@ private: MultiMuxerPrivate::Ptr _muxer; std::weak_ptr _track_listener; #if defined(ENABLE_RTPPROXY) - PSRtpSender::Ptr _ps_rtp_sender; + RtpSender::Ptr _rtp_sender; #endif //ENABLE_RTPPROXY }; diff --git a/src/Extension/H264Rtp.cpp b/src/Extension/H264Rtp.cpp index 8836385f..2dac8cb7 100644 --- a/src/Extension/H264Rtp.cpp +++ b/src/Extension/H264Rtp.cpp @@ -195,13 +195,13 @@ H264RtpEncoder::H264RtpEncoder(uint32_t ui32Ssrc, void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { GET_CONFIG(uint32_t,cycleMS,Rtp::kCycleMS); auto ptr = frame->data() + frame->prefixSize(); - auto pts = frame->pts() % cycleMS; auto len = frame->size() - frame->prefixSize(); + auto pts = frame->pts() % cycleMS; auto nal_type = H264_TYPE(ptr[0]); - auto max_rtp_size = _ui32MtuSize - 2; + auto payload_size = _ui32MtuSize - 2; //超过MTU则按照FU-A模式打包 - if (len > max_rtp_size) { + if (len > payload_size + 1) { //最高位bit为forbidden_zero_bit, //后面2bit为nal_ref_idc(帧重要程度),00:可以丢,11:不能丢 //末尾5bit为nalu type,固定为28(FU-A) @@ -211,11 +211,10 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { bool mark_bit = false; int offset = 1; while (!mark_bit) { - if (len <= offset + max_rtp_size) { - //已经拆分结束 - max_rtp_size = len - offset; - mark_bit = true; + if (len <= offset + payload_size) { //FU-A end + mark_bit = true; + payload_size = len - offset; s_e_r_flags = (1 << 6) | nal_type; } else if (fu_a_start) { //FU-A start @@ -227,7 +226,7 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { { //传入nullptr先不做payload的内存拷贝 - auto rtp = makeRtp(getTrackType(), nullptr, max_rtp_size + 2, mark_bit, pts); + auto rtp = makeRtp(getTrackType(), nullptr, payload_size + 2, mark_bit, pts); //rtp payload 负载部分 uint8_t *payload = (uint8_t*)rtp->data() + rtp->offset; //FU-A 第1个字节 @@ -235,11 +234,11 @@ void H264RtpEncoder::inputFrame(const Frame::Ptr &frame) { //FU-A 第2个字节 payload[1] = s_e_r_flags; //H264 数据 - memcpy(payload + 2, (unsigned char *) ptr + offset, max_rtp_size); + memcpy(payload + 2, (unsigned char *) ptr + offset, payload_size); //输入到rtp环形缓存 RtpCodec::inputRtp(rtp, fu_a_start && nal_type == H264Frame::NAL_IDR); } - offset += max_rtp_size; + offset += payload_size; fu_a_start = false; } } else { diff --git a/src/Extension/H265Rtp.cpp b/src/Extension/H265Rtp.cpp index 461d8ed0..dc07b180 100644 --- a/src/Extension/H265Rtp.cpp +++ b/src/Extension/H265Rtp.cpp @@ -18,10 +18,10 @@ namespace mediakit{ //44 +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ //45 |F| Type | LayerId | TID | //46 +-------------+-----------------+ -//48 F = 0 -//49 Type = 49 (fragmentation unit (FU)) -//50 LayerId = 0 -//51 TID = 1 +//48 F = 0, 1bit +//49 Type = 49 (fragmentation unit (FU)), 6bit +//50 LayerId = 0, 6bit +//51 TID = 1, 3bit //56 /* //57 create the FU header //58 @@ -29,7 +29,6 @@ namespace mediakit{ //60 +-+-+-+-+-+-+-+-+ //61 |S|E| FuType | //62 +---------------+ -//63 //64 S = variable //65 E = variable //66 FuType = NAL unit type @@ -150,40 +149,39 @@ H265RtpEncoder::H265RtpEncoder(uint32_t ui32Ssrc, } void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { - GET_CONFIG(uint32_t,cycleMS,Rtp::kCycleMS); - uint8_t *pcData = (uint8_t*)frame->data() + frame->prefixSize(); - auto uiStamp = frame->pts(); - auto iLen = frame->size() - frame->prefixSize(); - unsigned char naluType = H265_TYPE(pcData[0]); //获取NALU的5bit 帧类型 - uiStamp %= cycleMS; + GET_CONFIG(uint32_t, cycleMS, Rtp::kCycleMS); + auto ptr = (uint8_t *) frame->data() + frame->prefixSize(); + auto len = frame->size() - frame->prefixSize(); + auto pts = frame->pts() % cycleMS; + auto nal_type = H265_TYPE(ptr[0]); //获取NALU的5bit 帧类型 + auto payload_size = _ui32MtuSize - 3; - int maxSize = _ui32MtuSize - 3; //超过MTU,按照FU方式打包 - if (iLen > maxSize) { + if (len > payload_size + 2) { //获取帧头数据,1byte unsigned char s_e_flags; - bool bFirst = true; - bool mark = false; - int nOffset = 2; - while (!mark) { - if (iLen <= nOffset + maxSize) { //是否拆分结束 - maxSize = iLen - nOffset; - mark = true; + bool fu_start = true; + bool mark_bit = false; + int offset = 2; + while (!mark_bit) { + if (len <= offset + payload_size) { //FU end - s_e_flags = (1 << 6) | naluType; - } else if (bFirst) { + mark_bit = true; + payload_size = len - offset; + s_e_flags = (1 << 6) | nal_type; + } else if (fu_start) { //FU start - s_e_flags = (1 << 7) | naluType; + s_e_flags = (1 << 7) | nal_type; } else { //FU mid - s_e_flags = naluType; + s_e_flags = nal_type; } { //传入nullptr先不做payload的内存拷贝 - auto rtp = makeRtp(getTrackType(), nullptr, maxSize + 3, mark, uiStamp); + auto rtp = makeRtp(getTrackType(), nullptr, payload_size + 3, mark_bit, pts); //rtp payload 负载部分 - uint8_t *payload = (uint8_t*)rtp->data() + rtp->offset; + uint8_t *payload = (uint8_t *) rtp->data() + rtp->offset; //FU 第1个字节,表明为FU payload[0] = 49 << 1; //FU 第2个字节貌似固定为1 @@ -191,16 +189,16 @@ void H265RtpEncoder::inputFrame(const Frame::Ptr &frame) { //FU 第3个字节 payload[2] = s_e_flags; //H265 数据 - memcpy(payload + 3,pcData + nOffset, maxSize); + memcpy(payload + 3, ptr + offset, payload_size); //输入到rtp环形缓存 - RtpCodec::inputRtp(rtp,bFirst && H265Frame::isKeyFrame(naluType)); + RtpCodec::inputRtp(rtp, fu_start && H265Frame::isKeyFrame(nal_type)); } - nOffset += maxSize; - bFirst = false; + offset += payload_size; + fu_start = false; } } else { - makeH265Rtp(naluType,pcData, iLen, false, true, uiStamp); + makeH265Rtp(nal_type, ptr, len, false, true, pts); } } diff --git a/src/FMP4/FMP4MediaSource.h b/src/FMP4/FMP4MediaSource.h index 23b58fee..ef4af868 100644 --- a/src/FMP4/FMP4MediaSource.h +++ b/src/FMP4/FMP4MediaSource.h @@ -30,19 +30,8 @@ public: uint32_t time_stamp = 0; }; -//FMP4直播合并写策略类 -class FMP4FlushPolicy : public FlushPolicy{ -public: - FMP4FlushPolicy() = default; - ~FMP4FlushPolicy() = default; - - uint32_t getStamp(const FMP4Packet::Ptr &packet) { - return packet->time_stamp; - } -}; - //FMP4直播源 -class FMP4MediaSource : public MediaSource, public RingDelegate, public PacketCache{ +class FMP4MediaSource : public MediaSource, public RingDelegate, public PacketCache{ public: using Ptr = std::shared_ptr; using RingDataType = std::shared_ptr >; @@ -100,14 +89,15 @@ public: _have_video = true; } _speed += packet->size(); - PacketCache::inputPacket(true, std::move(packet), key); + auto stamp = packet->time_stamp; + PacketCache::inputPacket(stamp, true, std::move(packet), key); } /** * 情况GOP缓存 */ void clearCache() override { - PacketCache::clearCache(); + PacketCache::clearCache(); _ring->clearCache(); } diff --git a/src/FMP4/FMP4MediaSourceMuxer.h b/src/FMP4/FMP4MediaSourceMuxer.h index 50b0f83e..3d8482a0 100644 --- a/src/FMP4/FMP4MediaSourceMuxer.h +++ b/src/FMP4/FMP4MediaSourceMuxer.h @@ -32,7 +32,7 @@ public: ~FMP4MediaSourceMuxer() override = default; void setListener(const std::weak_ptr &listener){ - _listener = listener; + setDelegate(listener); _media_src->setListener(shared_from_this()); } diff --git a/src/Player/MediaPlayer.cpp b/src/Player/MediaPlayer.cpp index e77888f3..d766bf77 100644 --- a/src/Player/MediaPlayer.cpp +++ b/src/Player/MediaPlayer.cpp @@ -37,7 +37,7 @@ void MediaPlayer::play(const string &url) { _delegate->setOnShutdown(_shutdownCB); _delegate->setOnPlayResult(_playResultCB); _delegate->setOnResume(_resumeCB); - _delegate->setMediaSouce(_pMediaSrc); + _delegate->setMediaSource(_pMediaSrc); _delegate->mINI::operator=(*this); _delegate->play(url); } diff --git a/src/Player/PlayerBase.h b/src/Player/PlayerBase.h index 749a86aa..72a79b80 100644 --- a/src/Player/PlayerBase.h +++ b/src/Player/PlayerBase.h @@ -103,7 +103,7 @@ public: * 设置一个MediaSource,直接生产rtsp/rtmp代理 * @param src */ - virtual void setMediaSouce(const MediaSource::Ptr & src) {} + virtual void setMediaSource(const MediaSource::Ptr & src) {} /** * 获取丢包率,只支持rtsp @@ -181,9 +181,9 @@ public: return Parent::seekTo(fProgress); } - void setMediaSouce(const MediaSource::Ptr & src) override { + void setMediaSource(const MediaSource::Ptr & src) override { if (_delegate) { - _delegate->setMediaSouce(src); + _delegate->setMediaSource(src); } _pMediaSrc = src; } diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 78b8d5d6..03e4b8cf 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -93,6 +93,10 @@ void PlayerProxy::play(const string &strUrlTmp) { if(!strongSelf) { return; } + + //注销直接拉流代理产生的流:#532 + strongSelf->setMediaSource(nullptr); + if(strongSelf->_muxer) { auto tracks = strongSelf->MediaPlayer::getTracks(false); for (auto & track : tracks){ @@ -126,7 +130,7 @@ void PlayerProxy::play(const string &strUrlTmp) { mediaSource = std::make_shared(_vhost, _app, _stream_id); } if(mediaSource){ - setMediaSouce(mediaSource); + setMediaSource(mediaSource); mediaSource->setListener(shared_from_this()); } } @@ -163,7 +167,7 @@ bool PlayerProxy::close(MediaSource &sender,bool force) { return; } strongSelf->_muxer.reset(); - strongSelf->setMediaSouce(nullptr); + strongSelf->setMediaSource(nullptr); strongSelf->teardown(); if (strongSelf->_on_close) { strongSelf->_on_close(); diff --git a/src/Record/HlsRecorder.h b/src/Record/HlsRecorder.h index 3d1bc4f7..40b5c22c 100644 --- a/src/Record/HlsRecorder.h +++ b/src/Record/HlsRecorder.h @@ -34,7 +34,7 @@ public: } void setListener(const std::weak_ptr &listener) { - _listener = listener; + setDelegate(listener); _hls->getMediaSource()->setListener(shared_from_this()); //先注册媒体流,后续可以按需生成 _hls->getMediaSource()->registHls(false); diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index b538c003..8d309654 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -154,7 +154,8 @@ public: } bool key = pkt->isVideoKeyFrame(); bool is_video = pkt->type_id == MSG_VIDEO; - PacketCache::inputPacket(is_video, std::move(pkt), key); + auto stamp = pkt->time_stamp; + PacketCache::inputPacket(stamp, is_video, std::move(pkt), key); } /** diff --git a/src/Rtmp/RtmpMediaSourceImp.h b/src/Rtmp/RtmpMediaSourceImp.h index 24dc2c4f..f293a3a0 100644 --- a/src/Rtmp/RtmpMediaSourceImp.h +++ b/src/Rtmp/RtmpMediaSourceImp.h @@ -86,7 +86,7 @@ public: _muxer->setMediaListener(getListener()); _muxer->setTrackListener(static_pointer_cast(shared_from_this())); //让_muxer对象拦截一部分事件(比如说录像相关事件) - setListener(_muxer); + MediaSource::setListener(_muxer); for(auto &track : _demuxer->getTracks(false)){ _muxer->addTrack(track); @@ -119,6 +119,20 @@ public: } } + /** + * 设置事件监听器 + * @param listener 监听器 + */ + void setListener(const std::weak_ptr &listener) override{ + if (_muxer) { + //_muxer对象不能处理的事件再给listener处理 + _muxer->setMediaListener(listener); + } else { + //未创建_muxer对象,事件全部给listener处理 + MediaSource::setListener(listener); + } + } + private: bool _all_track_ready = false; bool _recreate_metadata = false; diff --git a/src/Rtmp/RtmpMediaSourceMuxer.h b/src/Rtmp/RtmpMediaSourceMuxer.h index e176beab..c8ef9652 100644 --- a/src/Rtmp/RtmpMediaSourceMuxer.h +++ b/src/Rtmp/RtmpMediaSourceMuxer.h @@ -32,7 +32,7 @@ public: ~RtmpMediaSourceMuxer() override{} void setListener(const std::weak_ptr &listener){ - _listener = listener; + setDelegate(listener); _media_src->setListener(shared_from_this()); } diff --git a/src/Rtp/GB28181Process.cpp b/src/Rtp/GB28181Process.cpp new file mode 100644 index 00000000..0006d2eb --- /dev/null +++ b/src/Rtp/GB28181Process.cpp @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#if defined(ENABLE_RTPPROXY) +#include "GB28181Process.h" +#include "Util/File.h" +#include "Http/HttpTSPlayer.h" +#include "Extension/CommonRtp.h" + +namespace mediakit{ + +//判断是否为ts负载 +static inline bool checkTS(const uint8_t *packet, int bytes){ + return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; +} + +GB28181Process::GB28181Process(const MediaInfo &media_info, MediaSinkInterface *interface) { + assert(interface); + _media_info = media_info; + _interface = interface; +} + +GB28181Process::~GB28181Process() {} + +bool GB28181Process::inputRtp(bool, const char *data, int data_len) { + return handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len); +} + +void GB28181Process::onRtpSorted(const RtpPacket::Ptr &rtp, int) { + if (!_rtp_decoder) { + switch (rtp->PT) { + case 33: + case 96: { + //ts或ps负载 + _rtp_decoder = std::make_shared(CodecInvalid, 256 * 1024); + + //设置dump目录 + GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); + if (!dump_dir.empty()) { + auto save_path = File::absolutePath(_media_info._streamid + ".mp2", dump_dir); + _save_file_ps.reset(File::create_file(save_path.data(), "wb"), [](FILE *fp) { + if (fp) { + fclose(fp); + } + }); + } + break; + } + + default: + WarnL << "不支持的rtp负载类型:" << (int) rtp->PT; + return; + } + + //设置frame回调 + _rtp_decoder->addDelegate(std::make_shared([this](const Frame::Ptr &frame) { + onRtpDecode(frame); + })); + } + + //解码rtp + _rtp_decoder->inputRtp(rtp, false); +} + +const char *GB28181Process::onSearchPacketTail(const char *packet,int bytes){ + try { + auto ret = _decoder->input((uint8_t *) packet, bytes); + if (ret > 0) { + return packet + ret; + } + return nullptr; + } catch (std::exception &ex) { + InfoL << "解析ps或ts异常: bytes=" << bytes + << " ,exception=" << ex.what() + << " ,hex=" << hexdump((uint8_t *) packet, bytes); + if (remainDataSize() > 256 * 1024) { + //缓存太多数据无法处理则上抛异常 + throw; + } + return nullptr; + } +} + +void GB28181Process::onRtpDecode(const Frame::Ptr &frame) { + //这是TS或PS + if (_save_file_ps) { + fwrite(frame->data(), frame->size(), 1, _save_file_ps.get()); + } + + if (!_decoder) { + //创建解码器 + if (checkTS((uint8_t *) frame->data(), frame->size())) { + //猜测是ts负载 + InfoL << _media_info._streamid << " judged to be TS"; + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, _interface); + } else { + //猜测是ps负载 + InfoL << _media_info._streamid << " judged to be PS"; + _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, _interface); + } + } + + if (_decoder) { + HttpRequestSplitter::input(frame->data(), frame->size()); + } +} + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/GB28181Process.h b/src/Rtp/GB28181Process.h new file mode 100644 index 00000000..536af856 --- /dev/null +++ b/src/Rtp/GB28181Process.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_GB28181ROCESS_H +#define ZLMEDIAKIT_GB28181ROCESS_H + +#if defined(ENABLE_RTPPROXY) + +#include "Decoder.h" +#include "ProcessInterface.h" +#include "Rtsp/RtpCodec.h" +#include "Rtsp/RtpReceiver.h" +#include "Http/HttpRequestSplitter.h" + +namespace mediakit{ + +class GB28181Process : public HttpRequestSplitter, public RtpReceiver, public ProcessInterface{ +public: + typedef std::shared_ptr Ptr; + GB28181Process(const MediaInfo &media_info, MediaSinkInterface *interface); + ~GB28181Process() override; + + /** + * 输入rtp + * @param data rtp数据指针 + * @param data_len rtp数据长度 + * @return 是否解析成功 + */ + bool inputRtp(bool, const char *data, int data_len) override; + +protected: + void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; + const char *onSearchPacketTail(const char *data,int len) override; + int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; }; + +private: + void onRtpDecode(const Frame::Ptr &frame); + +private: + MediaInfo _media_info; + DecoderImp::Ptr _decoder; + MediaSinkInterface *_interface; + std::shared_ptr _save_file_ps; + std::shared_ptr _rtp_decoder; +}; + +}//namespace mediakit +#endif//defined(ENABLE_RTPPROXY) +#endif //ZLMEDIAKIT_GB28181ROCESS_H diff --git a/src/Rtp/PSEncoder.cpp b/src/Rtp/PSEncoder.cpp index b6688af0..f2b47693 100644 --- a/src/Rtp/PSEncoder.cpp +++ b/src/Rtp/PSEncoder.cpp @@ -167,5 +167,43 @@ void PSEncoder::inputFrame(const Frame::Ptr &frame) { } } +//////////////////////////////////////////////////////////////////////////////////////////////// + +class RingDelegateHelper : public RingDelegate { +public: + typedef function onRtp; + + ~RingDelegateHelper() override{} + RingDelegateHelper(onRtp on_rtp){ + _on_rtp = std::move(on_rtp); + } + void onWrite(RtpPacket::Ptr in, bool is_key) override{ + _on_rtp(std::move(in), is_key); + } + +private: + onRtp _on_rtp; +}; + +//////////////////////////////////////////////////////////////////////////////////////////////////////////////// + +PSEncoderImp::PSEncoderImp(uint32_t ssrc, uint8_t payload_type) { + GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); + _rtp_encoder = std::make_shared(CodecInvalid, ssrc, video_mtu, 90000, payload_type, 0); + _rtp_encoder->setRtpRing(std::make_shared()); + _rtp_encoder->getRtpRing()->setDelegate(std::make_shared([this](RtpPacket::Ptr rtp, bool is_key){ + onRTP(std::move(rtp)); + })); + InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +} + +PSEncoderImp::~PSEncoderImp() { + InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +} + +void PSEncoderImp::onPS(uint32_t stamp, void *packet, size_t bytes) { + _rtp_encoder->inputFrame(std::make_shared((char *) packet, bytes, stamp)); +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) diff --git a/src/Rtp/PSEncoder.h b/src/Rtp/PSEncoder.h index 5b63d3fc..4e643cc6 100644 --- a/src/Rtp/PSEncoder.h +++ b/src/Rtp/PSEncoder.h @@ -14,6 +14,7 @@ #include "mpeg-ps.h" #include "Common/MediaSink.h" #include "Common/Stamp.h" +#include "Extension/CommonRtp.h" namespace mediakit{ //该类实现mpeg-ps容器格式的打包 @@ -65,6 +66,22 @@ private: unordered_map _codec_to_trackid; }; +class PSEncoderImp : public PSEncoder{ +public: + PSEncoderImp(uint32_t ssrc, uint8_t payload_type = 96); + ~PSEncoderImp() override; + +protected: + //rtp打包后回调 + virtual void onRTP(Buffer::Ptr rtp) = 0; + +protected: + void onPS(uint32_t stamp, void *packet, size_t bytes) override; + +private: + std::shared_ptr _rtp_encoder; +}; + }//namespace mediakit #endif //ENABLE_RTPPROXY #endif //ZLMEDIAKIT_PSENCODER_H diff --git a/src/Rtp/ProcessInterface.h b/src/Rtp/ProcessInterface.h new file mode 100644 index 00000000..c15879e1 --- /dev/null +++ b/src/Rtp/ProcessInterface.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + + +#ifndef ZLMEDIAKIT_PROCESSINTERFACE_H +#define ZLMEDIAKIT_PROCESSINTERFACE_H + +#include +#include + +namespace mediakit { + +class ProcessInterface { +public: + using Ptr = std::shared_ptr; + ProcessInterface() = default; + virtual ~ProcessInterface() = default; + + /** + * 输入rtp + * @param is_udp 是否为udp模式 + * @param data rtp数据指针 + * @param data_len rtp数据长度 + * @return 是否解析成功 + */ + virtual bool inputRtp(bool is_udp, const char *data, int data_len) = 0; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_PROCESSINTERFACE_H diff --git a/src/Rtp/RtpCache.cpp b/src/Rtp/RtpCache.cpp new file mode 100644 index 00000000..59ce4294 --- /dev/null +++ b/src/Rtp/RtpCache.cpp @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "RtpCache.h" + +namespace mediakit{ + +RtpCache::RtpCache(onFlushed cb) { + _cb = std::move(cb); +} + +void RtpCache::onFlush(std::shared_ptr > rtp_list, bool) { + _cb(std::move(rtp_list)); +} + +void RtpCache::input(uint64_t stamp, Buffer::Ptr buffer) { + inputPacket(stamp, true, std::move(buffer), false); +} + +void RtpCachePS::onRTP(Buffer::Ptr buffer) { + auto rtp = static_pointer_cast(buffer); + auto stamp = rtp->timeStamp; + input(stamp, std::move(buffer)); +} + +}//namespace mediakit diff --git a/src/Rtp/RtpCache.h b/src/Rtp/RtpCache.h new file mode 100644 index 00000000..b6d00e6a --- /dev/null +++ b/src/Rtp/RtpCache.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_RTPCACHE_H +#define ZLMEDIAKIT_RTPCACHE_H + +#include "PSEncoder.h" +#include "Extension/CommonRtp.h" + +namespace mediakit{ + +class RtpCache : public PacketCache { +public: + using onFlushed = function >)>; + RtpCache(onFlushed cb); + ~RtpCache() override = default; + +protected: + /** + * 输入rtp(目的是为了合并写) + * @param buffer rtp数据 + */ + void input(uint64_t stamp, Buffer::Ptr buffer); + +protected: + void onFlush(std::shared_ptr > rtp_list, bool) override; + +private: + onFlushed _cb; +}; + +class RtpCachePS : public RtpCache, public PSEncoderImp{ +public: + RtpCachePS(onFlushed cb, uint32_t ssrc, uint8_t payload_type = 96) : RtpCache(std::move(cb)), PSEncoderImp(ssrc, payload_type) {}; + ~RtpCachePS() override = default; + +protected: + void onRTP(Buffer::Ptr rtp) override; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_RTPCACHE_H diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 2764dfae..58398c54 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -9,14 +9,17 @@ */ #if defined(ENABLE_RTPPROXY) +#include "GB28181Process.h" #include "RtpProcess.h" +#include "RtpSplitter.h" #include "Util/File.h" #include "Http/HttpTSPlayer.h" + #define RTP_APP_NAME "rtp" -namespace mediakit{ +namespace mediakit { -static string printAddress(const struct sockaddr *addr){ +static string printAddress(const struct sockaddr *addr) { return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port); } @@ -26,20 +29,11 @@ RtpProcess::RtpProcess(const string &stream_id) { _media_info._app = RTP_APP_NAME; _media_info._streamid = stream_id; - GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir); + GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); { FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr; - if(fp){ - _save_file_rtp.reset(fp,[](FILE *fp){ - fclose(fp); - }); - } - } - - { - FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".mp2", dump_dir).data(), "wb") : nullptr; - if(fp){ - _save_file_ps.reset(fp,[](FILE *fp){ + if (fp) { + _save_file_rtp.reset(fp, [](FILE *fp) { fclose(fp); }); } @@ -47,20 +41,16 @@ RtpProcess::RtpProcess(const string &stream_id) { { FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr; - if(fp){ - _save_file_video.reset(fp,[](FILE *fp){ + if (fp) { + _save_file_video.reset(fp, [](FILE *fp) { fclose(fp); }); } } - _rtp_decoder = std::make_shared(CodecInvalid, 256 * 1024); - _rtp_decoder->addDelegate(std::make_shared([this](const Frame::Ptr &frame){ - onRtpDecode((uint8_t *) frame->data(), frame->size(), frame->dts()); - })); } RtpProcess::~RtpProcess() { - uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000; + uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000; WarnP(this) << "RTP推流器(" << _media_info._vhost << "/" << _media_info._app << "/" @@ -79,95 +69,47 @@ RtpProcess::~RtpProcess() { } } -bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) { - GET_CONFIG(bool,check_source,RtpProxy::kCheckSource); +bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, int len, const struct sockaddr *addr, uint32_t *dts_out) { + GET_CONFIG(bool, check_source, RtpProxy::kCheckSource); //检查源是否合法 - if(!_addr){ + if (!_addr) { _addr = new struct sockaddr; _sock = sock; - memcpy(_addr,addr, sizeof(struct sockaddr)); + memcpy(_addr, addr, sizeof(struct sockaddr)); DebugP(this) << "bind to address:" << printAddress(_addr); //推流鉴权 emitOnPublish(); } - if(!_muxer){ + if (!_muxer) { //无权限推流 return false; } - if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){ + if (check_source && memcmp(_addr, addr, sizeof(struct sockaddr)) != 0) { DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr); return false; } - _total_bytes += data_len; - bool ret = handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len); - if(dts_out){ + _total_bytes += len; + if (_save_file_rtp) { + uint16_t size = len; + size = htons(size); + fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get()); + fwrite((uint8_t *) data, len, 1, _save_file_rtp.get()); + } + if (!_process) { + _process = std::make_shared(_media_info, this); + } + bool ret = _process ? _process->inputRtp(is_udp, data, len) : false; + if (dts_out) { *dts_out = _dts; } return ret; } -//判断是否为ts负载 -static inline bool checkTS(const uint8_t *packet, int bytes){ - return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE; -} - -void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) { - if(rtp->sequence != (uint16_t)(_sequence + 1) && _sequence != 0){ - WarnP(this) << "rtp丢包:" << rtp->sequence << " != " << _sequence << "+1" << ",公网环境下请使用tcp方式推流"; - } - _sequence = rtp->sequence; - if(_save_file_rtp){ - uint16_t size = rtp->size() - 4; - size = htons(size); - fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get()); - fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get()); - } - _rtp_decoder->inputRtp(rtp); -} - -const char *RtpProcess::onSearchPacketTail(const char *packet,int bytes){ - try { - auto ret = _decoder->input((uint8_t *) packet, bytes); - if (ret > 0) { - return packet + ret; - } - return nullptr; - } catch (std::exception &ex) { - InfoP(this) << "解析ps或ts异常: bytes=" << bytes - << " ,exception=" << ex.what(); - //<< " ,hex=" << hexdump((uint8_t *) packet, bytes); - return nullptr; - } -} - -void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp) { - if(_save_file_ps){ - fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get()); - } - - if (!_decoder) { - //创建解码器 - if (checkTS(packet, bytes)) { - //猜测是ts负载 - InfoP(this) << "judged to be TS"; - _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this); - } else { - //猜测是ps负载 - InfoP(this) << "judged to be PS"; - _decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, this); - } - } - - if (_decoder) { - HttpRequestSplitter::input((char *) packet, bytes); - } -} - -void RtpProcess::inputFrame(const Frame::Ptr &frame){ - _last_rtp_time.resetTime(); +void RtpProcess::inputFrame(const Frame::Ptr &frame) { + _last_frame_time.resetTime(); _dts = frame->dts(); if (_save_file_video && frame->getTrackType() == TrackVideo) { fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get()); @@ -175,7 +117,7 @@ void RtpProcess::inputFrame(const Frame::Ptr &frame){ _muxer->inputFrame(frame); } -void RtpProcess::addTrack(const Track::Ptr & track){ +void RtpProcess::addTrack(const Track::Ptr &track) { _muxer->addTrack(track); } @@ -190,14 +132,14 @@ bool RtpProcess::alive() { } GET_CONFIG(int,timeoutSec,RtpProxy::kTimeoutSec) - if(_last_rtp_time.elapsedTime() / 1000 < timeoutSec){ + if(_last_frame_time.elapsedTime() / 1000 < timeoutSec){ return true; } return false; } -void RtpProcess::onDetach(){ - if(_on_detach){ +void RtpProcess::onDetach() { + if (_on_detach) { _on_detach(); } } @@ -207,47 +149,43 @@ void RtpProcess::setOnDetach(const function &cb) { } string RtpProcess::get_peer_ip() { - if(_addr){ + if (_addr) { return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr); } return "0.0.0.0"; } uint16_t RtpProcess::get_peer_port() { - if(!_addr){ + if (!_addr) { return 0; } return ntohs(((struct sockaddr_in *) _addr)->sin_port); } string RtpProcess::get_local_ip() { - if(_sock){ + if (_sock) { return _sock->get_local_ip(); } return "0.0.0.0"; } uint16_t RtpProcess::get_local_port() { - if(_sock){ - return _sock->get_local_port(); + if (_sock) { + return _sock->get_local_port(); } return 0; } -string RtpProcess::getIdentifier() const{ +string RtpProcess::getIdentifier() const { return _media_info._streamid; } -int RtpProcess::totalReaderCount(){ +int RtpProcess::totalReaderCount() { return _muxer ? _muxer->totalReaderCount() : 0; } -void RtpProcess::setListener(const std::weak_ptr &listener){ - if(_muxer){ - _muxer->setMediaListener(listener); - }else{ - _listener = listener; - } +void RtpProcess::setListener(const std::weak_ptr &listener) { + setDelegate(listener); } void RtpProcess::setRtpPause(bool pause) @@ -268,7 +206,7 @@ void RtpProcess::emitOnPublish() { strongSelf->_media_info._app, strongSelf->_media_info._streamid, 0, true, true, enableHls, enableMP4); - strongSelf->_muxer->setMediaListener(strongSelf->_listener); + strongSelf->_muxer->setMediaListener(strongSelf); InfoP(strongSelf) << "允许RTP推流"; } else { WarnP(strongSelf) << "禁止RTP推流:" << err; @@ -277,7 +215,7 @@ void RtpProcess::emitOnPublish() { //触发推流鉴权事件 auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast(*this)); - if(!flag){ + if (!flag) { //该事件无人监听,默认不鉴权 GET_CONFIG(bool, toHls, General::kPublishToHls); GET_CONFIG(bool, toMP4, General::kPublishToMP4); @@ -285,5 +223,17 @@ void RtpProcess::emitOnPublish() { } } +MediaOriginType RtpProcess::getOriginType(MediaSource &sender) const{ + return MediaOriginType::rtp_push; +} + +string RtpProcess::getOriginUrl(MediaSource &sender) const { + return _media_info._full_url; +} + +std::shared_ptr RtpProcess::getOriginSock(MediaSource &sender) const{ + return const_cast(this)->shared_from_this(); +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index 30d9f404..5446fbcd 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -12,18 +12,14 @@ #define ZLMEDIAKIT_RTPPROCESS_H #if defined(ENABLE_RTPPROXY) +#include "ProcessInterface.h" +#include "Common/MultiMediaSourceMuxer.h" -#include "Rtsp/RtpReceiver.h" -#include "Decoder.h" -#include "Common/Device.h" -#include "Common/Stamp.h" -#include "Http/HttpRequestSplitter.h" -#include "Extension/CommonRtp.h" using namespace mediakit; -namespace mediakit{ +namespace mediakit { -class RtpProcess : public HttpRequestSplitter, public RtpReceiver, public SockInfo, public MediaSinkInterface, public std::enable_shared_from_this{ +class RtpProcess : public SockInfo, public MediaSinkInterface, public MediaSourceEventInterceptor, public std::enable_shared_from_this{ public: typedef std::shared_ptr Ptr; friend class RtpProcessHelper; @@ -32,14 +28,15 @@ public: /** * 输入rtp + * @param is_udp 是否为udp模式 * @param sock 本地监听的socket * @param data rtp数据指针 - * @param data_len rtp数据长度 + * @param len rtp数据长度 * @param addr 数据源地址 * @param dts_out 解析出最新的dts * @return 是否解析成功 */ - bool inputRtp(const Socket::Ptr &sock, const char *data,int data_len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); + bool inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, int len, const struct sockaddr *addr , uint32_t *dts_out = nullptr); /** * 是否超时,用于超时移除对象 @@ -72,34 +69,30 @@ public: void setRtpPause(bool pause); protected: - void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override ; void inputFrame(const Frame::Ptr &frame) override; void addTrack(const Track::Ptr & track) override; void resetTracks() override {}; - const char *onSearchPacketTail(const char *data,int len) override; - int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; }; + //// MediaSourceEvent override //// + MediaOriginType getOriginType(MediaSource &sender) const override; + string getOriginUrl(MediaSource &sender) const override; + std::shared_ptr getOriginSock(MediaSource &sender) const override; private: void emitOnPublish(); - void onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp); private: - std::shared_ptr _rtp_decoder; - std::shared_ptr _save_file_rtp; - std::shared_ptr _save_file_ps; - std::shared_ptr _save_file_video; - struct sockaddr *_addr = nullptr; - uint16_t _sequence = 0; - MultiMediaSourceMuxer::Ptr _muxer; - Ticker _last_rtp_time; uint32_t _dts = 0; - DecoderImp::Ptr _decoder; - std::weak_ptr _listener; - MediaInfo _media_info; uint64_t _total_bytes = 0; + struct sockaddr *_addr = nullptr; Socket::Ptr _sock; + MediaInfo _media_info; + Ticker _last_frame_time; function _on_detach; + std::shared_ptr _save_file_rtp; + std::shared_ptr _save_file_video; + ProcessInterface::Ptr _process; + MultiMediaSourceMuxer::Ptr _muxer; bool _paused = false; Ticker _pause_rtp_time; diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 0ab553f2..a589f843 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -9,25 +9,34 @@ */ #if defined(ENABLE_RTPPROXY) +#include #include "RtpSelector.h" +#include "RtpSplitter.h" namespace mediakit{ INSTANCE_IMP(RtpSelector); +void RtpSelector::clear(){ + lock_guard lck(_mtx_map); + _map_rtp_process.clear(); +} + bool RtpSelector::inputRtp(const Socket::Ptr &sock, const char *data, int data_len, const struct sockaddr *addr,uint32_t *dts_out) { - //使用ssrc为流id uint32_t ssrc = 0; if (!getSSRC(data, data_len, ssrc)) { WarnL << "get ssrc from rtp failed:" << data_len; return false; } - - //假定指定了流id,那么通过流id来区分是否为一路流(哪怕可能同时收到多路流) auto process = getProcess(printSSRC(ssrc), true); if (process) { - return process->inputRtp(sock, data, data_len, addr, dts_out); + try { + return process->inputRtp(true, sock, data, data_len, addr, dts_out); + } catch (...) { + delProcess(printSSRC(ssrc), process.get()); + throw; + } } return false; } @@ -145,18 +154,6 @@ int RtpProcessHelper::totalReaderCount(MediaSource &sender) { return _process ? _process->totalReaderCount() : sender.totalReaderCount(); } -MediaOriginType RtpProcessHelper::getOriginType(MediaSource &sender) const{ - return MediaOriginType::rtp_push; -} - -string RtpProcessHelper::getOriginUrl(MediaSource &sender) const { - return _process ? _process->_media_info._full_url : ""; -} - -std::shared_ptr RtpProcessHelper::getOriginSock(MediaSource &sender) const{ - return _process; -} - RtpProcess::Ptr &RtpProcessHelper::getProcess() { return _process; } diff --git a/src/Rtp/RtpSelector.h b/src/Rtp/RtpSelector.h index 33a01256..fbe61e9c 100644 --- a/src/Rtp/RtpSelector.h +++ b/src/Rtp/RtpSelector.h @@ -34,12 +34,6 @@ protected: bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; - // 获取媒体源类型 - MediaOriginType getOriginType(MediaSource &sender) const override; - // 获取媒体源url或者文件路径 - string getOriginUrl(MediaSource &sender) const override; - // 获取媒体源客户端相关信息 - std::shared_ptr getOriginSock(MediaSource &sender) const override; private: weak_ptr _parent; @@ -55,6 +49,11 @@ public: static bool getSSRC(const char *data,int data_len, uint32_t &ssrc); static RtpSelector &Instance(); + /** + * 清空所有对象 + */ + void clear(); + /** * 输入多个rtp流,根据ssrc分流 * @param sock 本地socket diff --git a/src/Rtp/PSRtpSender.cpp b/src/Rtp/RtpSender.cpp similarity index 66% rename from src/Rtp/PSRtpSender.cpp rename to src/Rtp/RtpSender.cpp index 7f0a335a..8fdb531a 100644 --- a/src/Rtp/PSRtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -9,33 +9,29 @@ */ #if defined(ENABLE_RTPPROXY) -#include "PSRtpSender.h" +#include "RtpSender.h" #include "Rtsp/RtspSession.h" #include "Thread/WorkThreadPool.h" +#include "RtpCache.h" namespace mediakit{ -PSRtpSender::PSRtpSender(uint32_t ssrc, uint8_t payload_type) { - GET_CONFIG(uint32_t,video_mtu,Rtp::kVideoMtuSize); - _rtp_encoder = std::make_shared(CodecInvalid, ssrc, video_mtu, 90000, payload_type, 0); - _rtp_encoder->setRtpRing(std::make_shared()); - _rtp_encoder->getRtpRing()->setDelegate(std::make_shared([this](const RtpPacket::Ptr &rtp, bool is_key){ - onRtp(rtp, is_key); - })); +RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) { _poller = EventPollerPool::Instance().getPoller(); - InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); + _interface = std::make_shared([this](std::shared_ptr > list) { + onFlushRtpList(std::move(list)); + }, ssrc, payload_type); } -PSRtpSender::~PSRtpSender() { - InfoL << this << " " << printSSRC(_rtp_encoder->getSsrc()); +RtpSender::~RtpSender() { } -void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ +void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ _is_udp = is_udp; _socket = Socket::createSocket(_poller, false); _dst_url = dst_url; _dst_port = dst_port; - weak_ptr weak_self = shared_from_this(); + weak_ptr weak_self = shared_from_this(); if (is_udp) { _socket->bindUdpSock(0); auto poller = _poller; @@ -73,7 +69,7 @@ void PSRtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_ud } } -void PSRtpSender::onConnect(){ +void RtpSender::onConnect(){ _is_connect = true; //加大发送缓存,防止udp丢包之类的问题 SockUtil::setSendBuf(_socket->rawFD(), 4 * 1024 * 1024); @@ -83,37 +79,38 @@ void PSRtpSender::onConnect(){ _socket->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE); } //连接建立成功事件 - weak_ptr weak_self = shared_from_this(); + weak_ptr weak_self = shared_from_this(); _socket->setOnErr([weak_self](const SockException &err) { auto strong_self = weak_self.lock(); if (strong_self) { strong_self->onErr(err); } }); - InfoL << "开始发送 ps rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; + InfoL << "开始发送 rtp:" << _socket->get_peer_ip() << ":" << _socket->get_peer_port() << ", 是否为udp方式:" << _is_udp; +} + +void RtpSender::addTrack(const Track::Ptr &track){ + _interface->addTrack(track); +} + +void RtpSender::addTrackCompleted(){ + _interface->addTrackCompleted(); +} + +void RtpSender::resetTracks(){ + _interface->resetTracks(); } //此函数在其他线程执行 -void PSRtpSender::inputFrame(const Frame::Ptr &frame) { +void RtpSender::inputFrame(const Frame::Ptr &frame) { if (_is_connect) { //连接成功后才做实质操作(节省cpu资源) - PSEncoder::inputFrame(frame); + _interface->inputFrame(frame); } } //此函数在其他线程执行 -void PSRtpSender::onPS(uint32_t stamp, void *packet, size_t bytes) { - _rtp_encoder->inputFrame(std::make_shared((char *) packet, bytes, stamp)); -} - -//此函数在其他线程执行 -void PSRtpSender::onRtp(const RtpPacket::Ptr &rtp, bool) { - //开启合并写提高发送性能 - PacketCache::inputPacket(true, rtp, false); -} - -//此函数在其他线程执行 -void PSRtpSender::onFlush(shared_ptr > rtp_list, bool) { +void RtpSender::onFlushRtpList(shared_ptr > rtp_list) { if(!_is_connect){ //连接成功后才能发送数据 return; @@ -124,29 +121,29 @@ void PSRtpSender::onFlush(shared_ptr > rtp_list, bool) { _poller->async([rtp_list, is_udp, socket]() { int i = 0; int size = rtp_list->size(); - rtp_list->for_each([&](const RtpPacket::Ptr &packet) { + rtp_list->for_each([&](Buffer::Ptr &packet) { if (is_udp) { //udp模式,rtp over tcp前4个字节可以忽略 - socket->send(std::make_shared(packet, 4), nullptr, 0, ++i == size); + socket->send(std::make_shared(std::move(packet), 4), nullptr, 0, ++i == size); } else { //tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 - socket->send(std::make_shared(packet, 2), nullptr, 0, ++i == size); + socket->send(std::make_shared(std::move(packet), 2), nullptr, 0, ++i == size); } }); }); } -void PSRtpSender::onErr(const SockException &ex, bool is_connect) { +void RtpSender::onErr(const SockException &ex, bool is_connect) { _is_connect = false; //监听socket断开事件,方便重连 if (is_connect) { WarnL << "重连" << _dst_url << ":" << _dst_port << "失败, 原因为:" << ex.what(); } else { - WarnL << "停止发送 ps rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what(); + WarnL << "停止发送 rtp:" << _dst_url << ":" << _dst_port << ", 原因为:" << ex.what(); } - weak_ptr weak_self = shared_from_this(); + weak_ptr weak_self = shared_from_this(); _connect_timer = std::make_shared(10.0, [weak_self]() { auto strong_self = weak_self.lock(); if (!strong_self) { diff --git a/src/Rtp/PSRtpSender.h b/src/Rtp/RtpSender.h similarity index 53% rename from src/Rtp/PSRtpSender.h rename to src/Rtp/RtpSender.h index da817905..86a58894 100644 --- a/src/Rtp/PSRtpSender.h +++ b/src/Rtp/RtpSender.h @@ -8,42 +8,27 @@ * may be found in the AUTHORS file in the root of the source tree. */ -#ifndef ZLMEDIAKIT_PSRTPSENDER_H -#define ZLMEDIAKIT_PSRTPSENDER_H +#ifndef ZLMEDIAKIT_RTPSENDER_H +#define ZLMEDIAKIT_RTPSENDER_H #if defined(ENABLE_RTPPROXY) #include "PSEncoder.h" #include "Extension/CommonRtp.h" namespace mediakit{ -class RingDelegateHelper : public RingDelegate { +//rtp发送客户端,支持发送GB28181协议 +class RtpSender : public MediaSinkInterface, public std::enable_shared_from_this{ public: - typedef function onRtp; + typedef std::shared_ptr Ptr; - ~RingDelegateHelper() override{} - RingDelegateHelper(onRtp on_rtp){ - _on_rtp = std::move(on_rtp); - } - void onWrite(RtpPacket::Ptr in, bool is_key) override{ - _on_rtp(in, is_key); - } - -private: - onRtp _on_rtp; -}; - -//该类在PSEncoder的基础上,实现了mpeg-ps的rtp打包以及发送 -class PSRtpSender : public PSEncoder, public std::enable_shared_from_this, public PacketCache{ -public: - typedef std::shared_ptr Ptr; + ~RtpSender() override; /** - * 构造函数 + * 构造函数,创建GB28181 RTP发送客户端 * @param ssrc rtp的ssrc * @param payload_type 国标中ps-rtp的pt一般为96 */ - PSRtpSender(uint32_t ssrc, uint8_t payload_type = 96); - ~PSRtpSender() override; + RtpSender(uint32_t ssrc, uint8_t payload_type = 96); /** * 开始发送ps-rtp包 @@ -59,20 +44,26 @@ public: */ void inputFrame(const Frame::Ptr &frame) override; -protected: - //mpeg-ps回调 - void onPS(uint32_t stamp, void *packet, size_t bytes) override; + /** + * 添加track,内部会调用Track的clone方法 + * 只会克隆sps pps这些信息 ,而不会克隆Delegate相关关系 + * @param track + */ + virtual void addTrack(const Track::Ptr & track) override; /** - * 批量flush rtp包时触发该函数 - * @param rtp_list rtp包列表 - * @param key_pos 是否包含关键帧 + * 添加所有Track完毕 */ - void onFlush(std::shared_ptr > rtp_list, bool key_pos) override; + virtual void addTrackCompleted() override; + + /** + * 重置track + */ + virtual void resetTracks() override; private: - //rtp打包后回调 - void onRtp(const RtpPacket::Ptr &in, bool is_key); + //合并写输出 + void onFlushRtpList(std::shared_ptr > rtp_list); //udp/tcp连接成功回调 void onConnect(); //异常断开socket事件 @@ -86,9 +77,9 @@ private: Socket::Ptr _socket; EventPoller::Ptr _poller; Timer::Ptr _connect_timer; - std::shared_ptr _rtp_encoder; + MediaSinkInterface::Ptr _interface; }; }//namespace mediakit #endif// defined(ENABLE_RTPPROXY) -#endif //ZLMEDIAKIT_PSRTPSENDER_H +#endif //ZLMEDIAKIT_RTPSENDER_H diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index d417e148..16196fdf 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -52,7 +52,7 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, bool enable //指定了流id,那么一个端口一个流(不管是否包含多个ssrc的多个流,绑定rtp源后,会筛选掉ip端口不匹配的流) process = RtpSelector::Instance().getProcess(stream_id, true); udp_server->setOnRead([udp_server, process](const Buffer::Ptr &buf, struct sockaddr *addr, int) { - process->inputRtp(udp_server, buf->data(), buf->size(), addr); + process->inputRtp(true, udp_server, buf->data(), buf->size(), addr); }); } else { //未指定流id,一个端口多个流,通过ssrc来分流 diff --git a/src/Rtp/RtpSession.cpp b/src/Rtp/RtpSession.cpp index 9104ffdd..21fb5e2d 100644 --- a/src/Rtp/RtpSession.cpp +++ b/src/Rtp/RtpSession.cpp @@ -73,7 +73,7 @@ void RtpSession::onRtpPacket(const char *data, uint64_t len) { _process = RtpSelector::Instance().getProcess(_stream_id, true); _process->setListener(dynamic_pointer_cast(shared_from_this())); } - _process->inputRtp(getSock(), data, len, &addr); + _process->inputRtp(false, getSock(), data, len, &addr); _ticker.resetTime(); } diff --git a/src/Rtp/RtpSession.h b/src/Rtp/RtpSession.h index 07c9166b..9c411b9a 100644 --- a/src/Rtp/RtpSession.h +++ b/src/Rtp/RtpSession.h @@ -35,13 +35,14 @@ protected: bool close(MediaSource &sender,bool force) override; // 观看总人数 int totalReaderCount(MediaSource &sender) override; - void onRtpPacket(const char *data,uint64_t len) override; + // 收到rtp回调 + void onRtpPacket(const char *data, uint64_t len) override; private: - RtpProcess::Ptr _process; Ticker _ticker; - struct sockaddr addr; string _stream_id; + struct sockaddr addr; + RtpProcess::Ptr _process; }; }//namespace mediakit diff --git a/src/Rtp/RtpSplitter.cpp b/src/Rtp/RtpSplitter.cpp index 653c89ac..74b20c2e 100644 --- a/src/Rtp/RtpSplitter.cpp +++ b/src/Rtp/RtpSplitter.cpp @@ -9,18 +9,57 @@ */ #if defined(ENABLE_RTPPROXY) +#include +#include #include "RtpSplitter.h" namespace mediakit{ -RtpSplitter::RtpSplitter() {} +static const char kEHOME_MAGIC[] = "\x01\x00\x01\x00"; +static const int kEHOME_OFFSET = 256; +RtpSplitter::RtpSplitter() {} RtpSplitter::~RtpSplitter() {} +int64_t RtpSplitter::onRecvHeader(const char *data,uint64_t len){ + //忽略偏移量 + data += _offset; + len -= _offset; + + if (_offset == kEHOME_OFFSET + 4 && len > 12 && data[12] == '\r') { + //这是ehome,移除第12个字节 + memmove((char *) data + 1, data, 12); + data += 1; + len -= 1; + } + onRtpPacket(data, len); + return 0; +} + +static bool isEhome(const char *data, int len){ + if (len < 4) { + return false; + } + return memcmp(data, kEHOME_MAGIC, sizeof(kEHOME_MAGIC) - 1) == 0; +} + const char *RtpSplitter::onSearchPacketTail(const char *data, int len) { if (len < 4) { //数据不够 return nullptr; } + + if (isEhome(data, len)) { + //是ehome协议 + if (len < kEHOME_OFFSET + 4) { + //数据不够 + return nullptr; + } + //忽略ehome私有头后是rtsp样式的rtp,多4个字节, + _offset = kEHOME_OFFSET + 4; + //忽略ehome私有头 + return onSearchPacketTail_l(data + kEHOME_OFFSET + 2, len - kEHOME_OFFSET - 2); + } + if (data[0] == '$') { //可能是4个字节的rtp头 _offset = 4; @@ -42,10 +81,5 @@ const char *RtpSplitter::onSearchPacketTail_l(const char *data, int len) { return data + 2 + length; } -int64_t RtpSplitter::onRecvHeader(const char *data, uint64_t len) { - onRtpPacket(data + _offset, len - _offset); - return 0; -} - }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpSplitter.h b/src/Rtp/RtpSplitter.h index fd3f7370..d81cdd0e 100644 --- a/src/Rtp/RtpSplitter.h +++ b/src/Rtp/RtpSplitter.h @@ -19,18 +19,20 @@ namespace mediakit{ class RtpSplitter : public HttpRequestSplitter{ public: RtpSplitter(); - virtual ~RtpSplitter(); + ~RtpSplitter() override; protected: /** * 收到rtp包回调 + * @param data RTP包数据指针 + * @param len RTP包数据长度 */ - virtual void onRtpPacket(const char *data,uint64_t len) = 0; + virtual void onRtpPacket(const char *data, uint64_t len) = 0; protected: - const char *onSearchPacketTail(const char *data,int len) override ; - const char *onSearchPacketTail_l(const char *data,int len); - int64_t onRecvHeader(const char *data,uint64_t len) override; + int64_t onRecvHeader(const char *data, uint64_t len) override; + const char *onSearchPacketTail(const char *data, int len) override; + const char *onSearchPacketTail_l(const char *data, int len); private: int _offset = 0; diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 8aee75d0..3ae03e01 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -36,7 +36,11 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, return false; } + uint32_t version = rtp_raw_ptr[0] >> 6; uint8_t padding = 0; + uint8_t ext = rtp_raw_ptr[0] & 0x10; + uint8_t csrc = rtp_raw_ptr[0] & 0x0f; + if (rtp_raw_ptr[0] & 0x20) { //获取padding大小 padding = rtp_raw_ptr[rtp_raw_len - 1]; @@ -46,6 +50,10 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, rtp_raw_len -= padding; } + if (version != 2) { + throw std::invalid_argument("非法的rtp,version != 2"); + } + auto rtp_ptr = _rtp_pool.obtain(); auto &rtp = *rtp_ptr; @@ -95,8 +103,6 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, //获取rtp中媒体数据偏移量 rtp.offset = 12 + 4; - int csrc = rtp_raw_ptr[0] & 0x0f; - int ext = rtp_raw_ptr[0] & 0x10; rtp.offset += 4 * csrc; if (ext && rtp_raw_len >= rtp.offset) { /* calculate the header extension length (stored as number of 32-bit words) */ @@ -124,13 +130,11 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, payload_ptr[3] = (rtp_raw_len & 0x00FF); //拷贝rtp负载 memcpy(payload_ptr + 4, rtp_raw_ptr, rtp_raw_len); - //排序rtp - sortRtp(std::move(rtp_ptr), track_index); - return true; -} -void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){ - _rtp_sortor[track_index].sortPacket(rtp->sequence, rtp); + //排序rtp + auto seq = rtp_ptr->sequence; + _rtp_sortor[track_index].sortPacket(seq, std::move(rtp_ptr)); + return true; } void RtpReceiver::clear() { diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index a7c2a2e8..14d6d991 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -187,9 +187,6 @@ protected: int getJitterSize(int track_index); int getCycleCount(int track_index); -private: - void sortRtp(const RtpPacket::Ptr &rtp , int track_index); - private: uint32_t _ssrc[2] = {0, 0}; //ssrc不匹配计数 diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index cafeaf33..869eae73 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -183,7 +183,8 @@ public: } } bool is_video = rtp->type == TrackVideo; - PacketCache::inputPacket(is_video, std::move(rtp), keyPos); + auto stamp = rtp->timeStamp; + PacketCache::inputPacket(stamp, is_video, std::move(rtp), keyPos); } void clearCache() override{ diff --git a/src/Rtsp/RtspMediaSourceImp.h b/src/Rtsp/RtspMediaSourceImp.h index a2d2cbc2..a5cf5130 100644 --- a/src/Rtsp/RtspMediaSourceImp.h +++ b/src/Rtsp/RtspMediaSourceImp.h @@ -77,7 +77,7 @@ public: _muxer->setMediaListener(getListener()); _muxer->setTrackListener(static_pointer_cast(shared_from_this())); //让_muxer对象拦截一部分事件(比如说录像相关事件) - setListener(_muxer); + MediaSource::setListener(_muxer); for(auto &track : _demuxer->getTracks(false)){ _muxer->addTrack(track); @@ -102,6 +102,20 @@ public: _all_track_ready = true; } + /** + * 设置事件监听器 + * @param listener 监听器 + */ + void setListener(const std::weak_ptr &listener) override{ + if (_muxer) { + //_muxer对象不能处理的事件再给listener处理 + _muxer->setMediaListener(listener); + } else { + //未创建_muxer对象,事件全部给listener处理 + MediaSource::setListener(listener); + } + } + private: RtspDemuxer::Ptr _demuxer; MultiMediaSourceMuxer::Ptr _muxer; diff --git a/src/Rtsp/RtspMediaSourceMuxer.h b/src/Rtsp/RtspMediaSourceMuxer.h index 10bd920c..eb2a4bd4 100644 --- a/src/Rtsp/RtspMediaSourceMuxer.h +++ b/src/Rtsp/RtspMediaSourceMuxer.h @@ -32,7 +32,7 @@ public: ~RtspMediaSourceMuxer() override{} void setListener(const std::weak_ptr &listener){ - _listener = listener; + setDelegate(listener); _media_src->setListener(shared_from_this()); } diff --git a/src/Rtsp/RtspSession.h b/src/Rtsp/RtspSession.h index f681e874..cd691c7b 100644 --- a/src/Rtsp/RtspSession.h +++ b/src/Rtsp/RtspSession.h @@ -37,17 +37,19 @@ class RtspSession; class BufferRtp : public Buffer{ public: typedef std::shared_ptr Ptr; - BufferRtp(const RtpPacket::Ptr & pkt,uint32_t offset = 0 ):_rtp(pkt),_offset(offset){} - virtual ~BufferRtp(){} + BufferRtp(Buffer::Ptr pkt, uint32_t offset = 0) : _rtp(std::move(pkt)), _offset(offset) {} + ~BufferRtp() override{} char *data() const override { return (char *)_rtp->data() + _offset; } + uint32_t size() const override { return _rtp->size() - _offset; } + private: - RtpPacket::Ptr _rtp; + Buffer::Ptr _rtp; uint32_t _offset; }; diff --git a/src/TS/TSMediaSource.h b/src/TS/TSMediaSource.h index 8b8c57f7..f13ad1c1 100644 --- a/src/TS/TSMediaSource.h +++ b/src/TS/TSMediaSource.h @@ -30,19 +30,8 @@ public: uint32_t time_stamp = 0; }; -//TS直播合并写策略类 -class TSFlushPolicy : public FlushPolicy{ -public: - TSFlushPolicy() = default; - ~TSFlushPolicy() = default; - - uint32_t getStamp(const TSPacket::Ptr &packet) { - return packet->time_stamp; - } -}; - //TS直播源 -class TSMediaSource : public MediaSource, public RingDelegate, public PacketCache{ +class TSMediaSource : public MediaSource, public RingDelegate, public PacketCache{ public: using PoolType = ResourcePool; using Ptr = std::shared_ptr; @@ -83,14 +72,15 @@ public: if (key) { _have_video = true; } - PacketCache::inputPacket(true, std::move(packet), key); + auto stamp = packet->time_stamp; + PacketCache::inputPacket(stamp, true, std::move(packet), key); } /** * 情况GOP缓存 */ void clearCache() override { - PacketCache::clearCache(); + PacketCache::clearCache(); _ring->clearCache(); } diff --git a/src/TS/TSMediaSourceMuxer.h b/src/TS/TSMediaSourceMuxer.h index faea1f08..34051808 100644 --- a/src/TS/TSMediaSourceMuxer.h +++ b/src/TS/TSMediaSourceMuxer.h @@ -31,7 +31,7 @@ public: ~TSMediaSourceMuxer() override = default; void setListener(const std::weak_ptr &listener){ - _listener = listener; + setDelegate(listener); _media_src->setListener(shared_from_this()); }