diff --git a/3rdpart/ZLToolKit b/3rdpart/ZLToolKit index f564c7ed..735f194e 160000 --- a/3rdpart/ZLToolKit +++ b/3rdpart/ZLToolKit @@ -1 +1 @@ -Subproject commit f564c7ed27f141a3b5f08d239970b24d700a1244 +Subproject commit 735f194e18d5fb576c0da03024226a6f38537e5a diff --git a/README.md b/README.md index d5bcf43c..0927223e 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ ## 项目特点 - 基于C++11开发,避免使用裸指针,代码稳定可靠,性能优越。 -- 支持多种协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV/GB28181/HTTP-TS/Websocket-TS/HTTP-fMP4/Websocket-fMP4/MP4),支持协议互转。 +- 支持多种协议(RTSP/RTMP/HLS/HTTP-FLV/WebSocket-FLV/GB28181/HTTP-TS/WebSocket-TS/HTTP-fMP4/WebSocket-fMP4/MP4),支持协议互转。 - 使用多路复用/多线程/异步网络IO模式开发,并发性能优越,支持海量客户端连接。 - 代码经过长期大量的稳定性、性能测试,已经在线上商用验证已久。 - 支持linux、macos、ios、android、windows全平台。 @@ -65,10 +65,12 @@ - TS - 支持http[s]-ts直播 - 支持ws[s]-ts直播 + - 支持H264/H265/AAC/G711/OPUS编码 - fMP4 - 支持http[s]-fmp4直播 - 支持ws[s]-fmp4直播 + - 支持H264/H265/AAC/G711/OPUS编码 - HTTP[S]与WebSocket - 服务器支持`目录索引生成`,`文件下载`,`表单提交请求` @@ -131,26 +133,30 @@ docker run -id -p 1935:1935 -p 8080:80 -p 8554:554 -p 10000:10000 -p 10000:10000 bash build_docker_images.sh ``` -## 参考案例 +## 合作项目 - - [IOS摄像头实时录制,生成rtsp/rtmp/hls/http-flv](https://gitee.com/xia-chu/IOSMedia) - - [IOS rtmp/rtsp播放器,视频推流器](https://gitee.com/xia-chu/IOSPlayer) - - [支持linux、windows、mac的rtmp/rtsp播放器](https://github.com/xia-chu/ZLMediaPlayer) - - [基于ZLMediaKit分支的管理WEB网站](https://github.com/chenxiaolei/ZLMediaKit_NVR_UI) - - [基于ZLMediaKit主线的管理WEB网站](https://gitee.com/kkkkk5G/MediaServerUI) - - [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk) - - [C#版本的Http API与Hook](https://github.com/chengxiaosheng/ZLMediaKit.HttpApi) - - [GB28181-2016网络视频平台](https://github.com/swwheihei/wvp) - - [node-js版本的GB28181平台](https://gitee.com/hfwudao/GB28181_Node_Http) - - [基于C SDK实现的推流客户端](https://github.com/hctym1995/ZLM_ApiDemo) - - [Go实现的海康ehome服务器](https://github.com/tsingeye/FreeEhome) - + - 可视化管理网站 + - [一个非常漂亮的可视化后台管理系统](https://github.com/MingZhuLiu/ZLMediaServerManagent) + - [基于ZLMediaKit主线的管理WEB网站](https://gitee.com/kkkkk5G/MediaServerUI) + - [基于ZLMediaKit分支的管理WEB网站](https://github.com/chenxiaolei/ZLMediaKit_NVR_UI) + + - 流媒体管理平台 + - [功能强大的流媒体控制管理接口平台,支持GB28181](https://github.com/chatop2020/StreamNode) + - [GB28181-2016网络视频平台](https://github.com/swwheihei/wvp) + - [node-js版本的GB28181平台](https://gitee.com/hfwudao/GB28181_Node_Http) + - [Go实现的海康ehome服务器](https://github.com/tsingeye/FreeEhome) + + - 客户端 + - [基于C SDK实现的推流客户端](https://github.com/hctym1995/ZLM_ApiDemo) + - [C#版本的Http API与Hook](https://github.com/chengxiaosheng/ZLMediaKit.HttpApi) + - [DotNetCore的RESTful客户端](https://github.com/MingZhuLiu/ZLMediaKit.DotNetCore.Sdk) ## 授权协议 本项目自有代码使用宽松的MIT协议,在保留版权信息的情况下可以自由应用于各自商用、非商业的项目。 但是本项目也零碎的使用了一些其他的开源代码,在商用的情况下请自行替代或剔除; -由于使用本项目而产生的商业纠纷或侵权行为一概与本项项目及开发者无关,请自行承担法律风险。 +由于使用本项目而产生的商业纠纷或侵权行为一概与本项目及开发者无关,请自行承担法律风险。 +在使用本项目代码时,也应该在授权协议中同时表明本项目依赖的第三方库的协议。 ## 联系方式 @@ -166,6 +172,12 @@ bash build_docker_images.sh - 3、有些问题,如果不具备参考性的,无需在issue提的,可以在qq群提. - 4、QQ私聊一般不接受无偿技术咨询和支持([为什么不提倡QQ私聊](https://github.com/xia-chu/ZLMediaKit/wiki/%E4%B8%BA%E4%BB%80%E4%B9%88%E4%B8%8D%E5%BB%BA%E8%AE%AEQQ%E7%A7%81%E8%81%8A%E5%92%A8%E8%AF%A2%E9%97%AE%E9%A2%98%EF%BC%9F)). +## 特别感谢 + +本项目采用了[老陈](https://github.com/ireader) 的 [media-server](https://github.com/ireader/media-server) 库, +本项目的 ts/fmp4/mp4/ps 容器格式的复用解复用都依赖media-server库。在实现本项目诸多功能时,老陈多次给予了无私热情关键的帮助, +特此对他表示诚挚的感谢! + ## 致谢 感谢以下各位对本项目包括但不限于代码贡献、问题反馈、资金捐赠等各种方式的支持!以下排名不分先后: @@ -192,11 +204,26 @@ bash build_docker_images.sh [swwheihei](https://github.com/swwheihei) [KKKKK5G](https://gitee.com/kkkkk5G) [Zhou Weimin]() +[Jim Jin](https://github.com/jim-king-2000) +[西瓜丶](<392293307@qq.com>) +[MingZhuLiu](https://github.com/MingZhuLiu) +[chengxiaosheng](https://github.com/chengxiaosheng) +[big panda](<2381267071@qq.com>) +[tanningzhong](https://github.com/tanningzhong) +[hctym1995](https://github.com/hctym1995) + +## 使用案例 + +本项目已经得到不少公司和个人开发者的认可,据作者不完全统计, +使用本项目的公司包括知名的互联网巨头、国内排名前列的云服务公司、多家知名的AI独角兽公司, +以及一系列中小型公司。使用者可以通过在 [issue](https://github.com/xia-chu/ZLMediaKit/issues/511) 上粘贴公司的大名和相关项目介绍为本项目背书,感谢支持! ## 捐赠 +您的捐赠将用于支付该项目的一些费用支出以及激励开发者, 欢迎捐赠以便更好的推动项目的发展,谢谢您的支持! +同时欢迎捐赠公网服务器用于在线展示效果。 [支付宝](https://gitee.com/xia-chu/other/raw/master/IMG_3919.JPG) diff --git a/conf/config.ini b/conf/config.ini index 16d301b8..dfe343b9 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -52,6 +52,8 @@ mergeWriteMS=0 #会直接影响rtsp/rtmp/hls/mp4/flv等协议的时间戳 #同协议情况下不影响(例如rtsp/rtmp推流,那么播放rtsp/rtmp时不会影响时间戳) modifyStamp=0 +#服务器唯一id,用于触发hook时区别是哪台服务器 +mediaServerId=your_server_id [hls] #hls写文件的buf大小,调整参数可以提高文件io性能 diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index b20e2b4d..3b1d5a43 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -463,16 +463,6 @@ "value": "rtmp://live.hkstv.hk.lxdns.com/live/hks2", "description": "拉流地址,例如rtmp://live.hkstv.hk.lxdns.com/live/hks2" }, - { - "key": "enable_rtsp", - "value": "1", - "description": "是否转rtsp" - }, - { - "key": "enable_rtmp", - "value": "1", - "description": "是否转rtmp" - }, { "key": "enable_hls", "value": null, @@ -555,7 +545,7 @@ }, { "key": "dst_url", - "value": "rtmp://10.8.9.115:8554/live/hks2", + "value": "rtmp://127.0.0.1/live/hks2", "description": "FFmpeg rtmp推流地址,一般都是推给自己,例如rtmp://127.0.0.1/live/stream_form_ffmpeg" }, { @@ -780,7 +770,7 @@ { "key": "customized_path", "value": null, - "description": "录像保存目录" + "description": "录像文件保存自定义根目录,为空则采用配置文件设置" } ] } diff --git a/server/FFmpegSource.cpp b/server/FFmpegSource.cpp index 21e87b41..35856aaa 100644 --- a/server/FFmpegSource.cpp +++ b/server/FFmpegSource.cpp @@ -197,8 +197,8 @@ void FFmpegSource::startTimer(int timeout_ms) { strongSelf->findAsync(0, [&](const MediaSource::Ptr &src) { //同步查找流 if (!src) { - //流不在线,重新拉流 - if(strongSelf->_replay_ticker.elapsedTime() > 10 * 1000){ + //流不在线,重新拉流, 这里原先是10秒超时,实际发现10秒不够,改成20秒了 + if(strongSelf->_replay_ticker.elapsedTime() > 20 * 1000){ //上次重试时间超过10秒,那么再重试FFmpeg拉流 strongSelf->_replay_ticker.resetTime(); strongSelf->play(strongSelf->_src_url, strongSelf->_dst_url, timeout_ms, [](const SockException &) {}); @@ -276,20 +276,27 @@ void FFmpegSnap::makeSnap(const string &play_url, const string &save_path, float GET_CONFIG(string,ffmpeg_bin,FFmpeg::kBin); GET_CONFIG(string,ffmpeg_snap,FFmpeg::kSnap); GET_CONFIG(string,ffmpeg_log,FFmpeg::kLog); - - std::shared_ptr process = std::make_shared(); - auto delayTask = EventPollerPool::Instance().getPoller()->doDelayTask(timeout_sec * 1000,[process,cb](){ - if(process->wait(false)){ - //FFmpeg进程还在运行,超时就关闭它 - process->kill(2000); + Ticker ticker; + WorkThreadPool::Instance().getPoller()->async([timeout_sec, play_url,save_path,cb, ticker](){ + auto elapsed_ms = ticker.elapsedTime(); + if (elapsed_ms > timeout_sec * 1000) { + //超时,后台线程负载太高,当代太久才启动该任务 + cb(false); + return; } - return 0; - }); - - WorkThreadPool::Instance().getPoller()->async([process,play_url,save_path,delayTask,cb](){ char cmd[1024] = {0}; snprintf(cmd, sizeof(cmd),ffmpeg_snap.data(),ffmpeg_bin.data(),play_url.data(),save_path.data()); + std::shared_ptr process = std::make_shared(); process->run(cmd,ffmpeg_log.empty() ? "" : File::absolutePath("",ffmpeg_log)); + //定时器延时应该减去后台任务启动的延时 + auto delayTask = EventPollerPool::Instance().getPoller()->doDelayTask(timeout_sec * 1000 - elapsed_ms,[process,cb](){ + if(process->wait(false)){ + //FFmpeg进程还在运行,超时就关闭它 + process->kill(2000); + } + return 0; + }); + //等待FFmpeg进程退出 process->wait(true); //FFmpeg进程退出了可以取消定时器了 diff --git a/server/Process.cpp b/server/Process.cpp index 7a5f7c18..47ab2d21 100644 --- a/server/Process.cpp +++ b/server/Process.cpp @@ -25,7 +25,7 @@ #include "Util/File.h" #include "Util/logger.h" #include "Util/uv_errno.h" -#include "Thread/WorkThreadPool.h" +#include "Poller/EventPoller.h" #include "Process.h" using namespace toolkit; @@ -261,7 +261,7 @@ static void s_kill(pid_t pid, void *handle, int max_delay, bool force) { } //发送SIGTERM信号后,2秒后检查子进程是否已经退出 - WorkThreadPool::Instance().getPoller()->doDelayTask(max_delay, [pid, handle]() { + EventPollerPool::Instance().getPoller()->doDelayTask(max_delay, [pid, handle]() { if (!s_wait(pid, handle, nullptr, false)) { //进程已经退出了 return 0; diff --git a/server/WebApi.cpp b/server/WebApi.cpp index e15ba753..b3bb4030 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -403,6 +403,9 @@ void installWebApi() { item["vhost"] = media->getVhost(); item["app"] = media->getApp(); item["stream"] = media->getId(); + item["createStamp"] = (Json::UInt64) media->getCreateStamp(); + item["aliveSecond"] = (Json::UInt64) media->getAliveSecond(); + item["bytesSpeed"] = media->getBytesSpeed(); item["readerCount"] = media->readerCount(); item["totalReaderCount"] = media->totalReaderCount(); item["originType"] = (int) media->getOriginType(); diff --git a/server/WebHook.cpp b/server/WebHook.cpp index 0ef0e7ba..62a7a97d 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -134,6 +134,9 @@ const char *getContentType(const HttpArgs &value){ } static void do_http_hook(const string &url,const ArgsType &body,const function &fun){ + GET_CONFIG(string,mediaServerId,General::kMediaServerId); + const_cast(body)["mediaServerId"] = mediaServerId; + GET_CONFIG(float,hook_timeoutSec,Hook::kTimeoutSec); HttpRequester::Ptr requester(new HttpRequester); requester->setMethod("POST"); diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index 6f5f564e..d548a7d1 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -43,6 +43,7 @@ MediaSource::MediaSource(const string &schema, const string &vhost, const string _schema = schema; _app = app; _stream_id = stream_id; + _create_stamp = time(NULL); } MediaSource::~MediaSource() { @@ -66,6 +67,19 @@ const string& MediaSource::getId() const { return _stream_id; } +int MediaSource::getBytesSpeed(){ + return _speed.getSpeed(); +} + +uint64_t MediaSource::getCreateStamp() const { + return _create_stamp; +} + +uint64_t MediaSource::getAliveSecond() const { + //使用Ticker对象获取存活时间的目的是防止修改系统时间导致回退 + return _ticker.createdTime() / 1000; +} + vector MediaSource::getTracks(bool ready) const { auto listener = _listener.lock(); if(!listener){ diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 039cf127..1efd4b2b 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -137,6 +137,52 @@ public: string _param_strs; }; +class BytesSpeed { +public: + BytesSpeed() = default; + ~BytesSpeed() = default; + + /** + * 添加统计字节 + */ + BytesSpeed& operator += (uint64_t bytes) { + _bytes += bytes; + if (_bytes > 1024 * 1024) { + //数据大于1MB就计算一次网速 + computeSpeed(); + } + return *this; + } + + /** + * 获取速度,单位bytes/s + */ + int getSpeed() { + if (_ticker.elapsedTime() < 1000) { + //获取频率小于1秒,那么返回上次计算结果 + return _speed; + } + return computeSpeed(); + } + +private: + uint64_t computeSpeed() { + auto elapsed = _ticker.elapsedTime(); + if (!elapsed) { + return _speed; + } + _speed = _bytes * 1000 / elapsed; + _ticker.resetTime(); + _bytes = 0; + return _speed; + } + +private: + int _speed = 0; + uint64_t _bytes = 0; + Ticker _ticker; +}; + /** * 媒体源,任何rtsp/rtmp的直播流都源自该对象 */ @@ -170,6 +216,13 @@ public: // 设置时间戳 virtual void setTimeStamp(uint32_t stamp) {}; + // 获取数据速率,单位bytes/s + int getBytesSpeed(); + // 获取流创建GMT unix时间戳,单位秒 + uint64_t getCreateStamp() const; + // 获取流上线时间,单位秒 + uint64_t getAliveSecond() const; + ////////////////MediaSourceEvent相关接口实现//////////////// // 设置监听者 @@ -229,7 +282,12 @@ private: //触发媒体事件 void emitEvent(bool regist); +protected: + BytesSpeed _speed; + private: + time_t _create_stamp; + Ticker _ticker; string _schema; string _vhost; string _app; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 11b9ad37..8209689f 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -60,6 +60,7 @@ const string kBroadcaseProxyPusherFailed = "kBroadcaseProxyPusherFailed"; //通用配置项目 namespace General{ #define GENERAL_FIELD "general." +const string kMediaServerId = GENERAL_FIELD"mediaServerId"; const string kFlowThreshold = GENERAL_FIELD"flowThreshold"; const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS"; const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS"; @@ -82,6 +83,7 @@ onceToken token([](){ mINI::Instance()[kPublishToMP4] = 0; mINI::Instance()[kMergeWriteMS] = 0; mINI::Instance()[kModifyStamp] = 0; + mINI::Instance()[kMediaServerId] = makeRandStr(16); },nullptr); }//namespace General diff --git a/src/Common/config.h b/src/Common/config.h index f640851b..cd62953b 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -162,6 +162,8 @@ extern const string kBroadcastReloadConfig; ////////////通用配置/////////// namespace General{ +//每个流媒体服务器的ID(GUID) +extern const string kMediaServerId; //流量汇报事件流量阈值,单位KB,默认1MB extern const string kFlowThreshold; //流无人观看并且超过若干时间后才触发kBroadcastStreamNoneReader事件 diff --git a/src/FMP4/FMP4MediaSource.h b/src/FMP4/FMP4MediaSource.h index 277d4bf8..173f12e4 100644 --- a/src/FMP4/FMP4MediaSource.h +++ b/src/FMP4/FMP4MediaSource.h @@ -99,6 +99,7 @@ public: if (key) { _have_video = true; } + _speed += packet->size(); PacketCache::inputPacket(true, packet, key); } diff --git a/src/Record/HlsMakerImp.cpp b/src/Record/HlsMakerImp.cpp index 22627b3e..464ee643 100644 --- a/src/Record/HlsMakerImp.cpp +++ b/src/Record/HlsMakerImp.cpp @@ -105,6 +105,9 @@ void HlsMakerImp::onWriteSegment(const char *data, int len) { if (_file) { fwrite(data, len, 1, _file.get()); } + if (_media_src) { + _media_src->onSegmentSize(len); + } } void HlsMakerImp::onWriteHls(const char *data, int len) { diff --git a/src/Record/HlsMediaSource.h b/src/Record/HlsMediaSource.h index b5b8357f..32e46cc5 100644 --- a/src/Record/HlsMediaSource.h +++ b/src/Record/HlsMediaSource.h @@ -79,6 +79,10 @@ public: _list_cb.emplace_back(std::move(cb)); } + void onSegmentSize(uint64_t bytes) { + _speed += bytes; + } + private: bool _is_regist = false; RingType::Ptr _ring; diff --git a/src/Record/Recorder.cpp b/src/Record/Recorder.cpp index 21c59b5a..62f32d76 100644 --- a/src/Record/Recorder.cpp +++ b/src/Record/Recorder.cpp @@ -32,7 +32,7 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s } //Here we use the customized file path. if (!customized_path.empty()) { - m3u8FilePath = customized_path + "/hls.m3u8"; + return File::absolutePath(m3u8FilePath, customized_path); } return File::absolutePath(m3u8FilePath, hlsPath); } @@ -47,7 +47,7 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s } //Here we use the customized file path. if (!customized_path.empty()) { - mp4FilePath = customized_path + "/"; + return File::absolutePath(mp4FilePath, customized_path); } return File::absolutePath(mp4FilePath, recordPath); } diff --git a/src/Record/Recorder.h b/src/Record/Recorder.h index b057c728..4017e0b1 100644 --- a/src/Record/Recorder.h +++ b/src/Record/Recorder.h @@ -47,7 +47,7 @@ public: * @param vhost 虚拟主机 * @param app 应用名 * @param stream_id 流id - * @param customized_path 录像文件保存自定义目录,默认为空则自动生成 + * @param customized_path 录像文件保存自定义根目录,为空则采用配置文件设置 * @return 录制文件绝对路径 */ static string getRecordPath(type type, const string &vhost, const string &app, const string &stream_id,const string &customized_path = ""); @@ -58,7 +58,7 @@ public: * @param vhost 虚拟主机 * @param app 应用名 * @param stream_id 流id - * @param customized_path 录像文件保存自定义目录,默认为空则自动生成 + * @param customized_path 录像文件保存自定义根目录,为空则采用配置文件设置 * @return 对象指针,可能为nullptr */ static std::shared_ptr createRecorder(type type, const string &vhost, const string &app, const string &stream_id, const string &customized_path = ""); @@ -79,7 +79,7 @@ public: * @param vhost 虚拟主机 * @param app 应用名 * @param stream_id 流id - * @param customized_path 录像文件保存自定义目录,默认为空则自动生成 + * @param customized_path 录像文件保存自定义根目录,为空则采用配置文件设置 * @return 成功与否 */ static bool startRecord(type type, const string &vhost, const string &app, const string &stream_id,const string &customized_path); diff --git a/src/Rtmp/RtmpMediaSource.h b/src/Rtmp/RtmpMediaSource.h index cd3f6b5c..977f77de 100644 --- a/src/Rtmp/RtmpMediaSource.h +++ b/src/Rtmp/RtmpMediaSource.h @@ -119,6 +119,7 @@ public: * @param pkt rtmp包 */ void onWrite(const RtmpPacket::Ptr &pkt, bool = true) override { + _speed += pkt->size(); //保存当前时间戳 switch (pkt->type_id) { case MSG_VIDEO : _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break; diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 0b10063c..6268c358 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -11,11 +11,6 @@ #include "Common/config.h" #include "RtpReceiver.h" -#define POP_HEAD(trackidx) \ - auto it = _rtp_sort_cache_map[trackidx].begin(); \ - onRtpSorted(it->second, trackidx); \ - _rtp_sort_cache_map[trackidx].erase(it); - #define AV_RB16(x) \ ((((const uint8_t*)(x))[0] << 8) | \ ((const uint8_t*)(x))[1]) @@ -24,7 +19,18 @@ namespace mediakit { -RtpReceiver::RtpReceiver() {} +RtpReceiver::RtpReceiver() { + GET_CONFIG(uint32_t, clearCount, Rtp::kClearCount); + GET_CONFIG(uint32_t, maxRtpCount, Rtp::kMaxRtpCount); + int index = 0; + for (auto &sortor : _rtp_sortor) { + sortor.setup(maxRtpCount, clearCount); + sortor.setOnSort([this, index](uint16_t seq, const RtpPacket::Ptr &packet) { + onRtpSorted(packet, index); + }); + ++index; + } +} RtpReceiver::~RtpReceiver() {} bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) { @@ -80,7 +86,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, if (_ssrc_err_count[track_index]++ > 10) { //ssrc切换后清除老数据 WarnL << "ssrc更换:" << _ssrc[track_index] << " -> " << rtp.ssrc; - _rtp_sort_cache_map[track_index].clear(); + _rtp_sortor[track_index].clear(); _ssrc[track_index] = rtp.ssrc; } return false; @@ -127,56 +133,15 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, } void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){ - if(rtp->sequence != _last_seq[track_index] + 1 && _last_seq[track_index] != 0){ - //包乱序或丢包 - _seq_ok_count[track_index] = 0; - _sort_started[track_index] = true; - if(_last_seq[track_index] > rtp->sequence && _last_seq[track_index] - rtp->sequence > 0xFF){ - //sequence回环,清空所有排序缓存 - while (_rtp_sort_cache_map[track_index].size()) { - POP_HEAD(track_index) - } - ++_seq_cycle_count[track_index]; - } - }else{ - //正确序列的包 - _seq_ok_count[track_index]++; - } - - _last_seq[track_index] = rtp->sequence; - - //开始排序缓存 - if (_sort_started[track_index]) { - _rtp_sort_cache_map[track_index].emplace(rtp->sequence, rtp); - GET_CONFIG(uint32_t,clearCount,Rtp::kClearCount); - GET_CONFIG(uint32_t,maxRtpCount,Rtp::kMaxRtpCount); - if (_seq_ok_count[track_index] >= clearCount) { - //网络环境改善,需要清空排序缓存 - _seq_ok_count[track_index] = 0; - _sort_started[track_index] = false; - while (_rtp_sort_cache_map[track_index].size()) { - POP_HEAD(track_index) - } - } else if (_rtp_sort_cache_map[track_index].size() >= maxRtpCount) { - //排序缓存溢出 - POP_HEAD(track_index) - } - }else{ - //正确序列 - onRtpSorted(rtp, track_index); - } + _rtp_sortor[track_index].sortPacket(rtp->sequence, rtp); } void RtpReceiver::clear() { - CLEAR_ARR(_last_seq); CLEAR_ARR(_ssrc); CLEAR_ARR(_ssrc_err_count); - CLEAR_ARR(_seq_ok_count); - CLEAR_ARR(_sort_started); - CLEAR_ARR(_seq_cycle_count); - - _rtp_sort_cache_map[0].clear(); - _rtp_sort_cache_map[1].clear(); + for (auto &sortor : _rtp_sortor) { + sortor.clear(); + } } void RtpReceiver::setPoolSize(int size) { @@ -184,11 +149,11 @@ void RtpReceiver::setPoolSize(int size) { } int RtpReceiver::getJitterSize(int track_index){ - return _rtp_sort_cache_map[track_index].size(); + return _rtp_sortor[track_index].getJitterSize(); } int RtpReceiver::getCycleCount(int track_index){ - return _seq_cycle_count[track_index]; + return _rtp_sortor[track_index].getCycleCount(); } diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 243fef20..f4c7e94b 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -11,24 +11,141 @@ #ifndef ZLMEDIAKIT_RTPRECEIVER_H #define ZLMEDIAKIT_RTPRECEIVER_H - #include #include #include #include "RtpCodec.h" #include "RtspMediaSource.h" - using namespace std; using namespace toolkit; namespace mediakit { +template +class PacketSortor { +public: + PacketSortor() = default; + ~PacketSortor() = default; + + /** + * 设置参数 + * @param max_sort_size 最大排序缓存长度 + * @param clear_sort_size seq连续次数超过该值后,清空并关闭排序缓存 + */ + void setup(uint32_t max_sort_size, uint32_t clear_sort_size) { + _max_sort_size = max_sort_size; + _clear_sort_size = clear_sort_size; + } + + void setOnSort(function cb){ + _cb = std::move(cb); + } + + /** + * 清空状态 + */ + void clear() { + _last_seq = 0; + _seq_ok_count = 0; + _sort_started = 0; + _seq_cycle_count = 0; + _rtp_sort_cache_map.clear(); + } + + /** + * 获取排序缓存长度 + */ + int getJitterSize(){ + return _rtp_sort_cache_map.size(); + } + + /** + * 获取seq回环次数 + */ + int getCycleCount(){ + return _seq_cycle_count; + } + + /** + * 输入并排序 + * @param seq 序列号 + * @param packet 包负载 + */ + void sortPacket(SEQ seq, const T &packet){ + if (seq != _last_seq + 1 && _last_seq != 0) { + //包乱序或丢包 + _seq_ok_count = 0; + _sort_started = true; + if (_last_seq > seq && _last_seq - seq > 0xFF) { + //sequence回环,清空所有排序缓存 + while (_rtp_sort_cache_map.size()) { + popPacket(); + } + ++_seq_cycle_count; + } + } else { + //正确序列的包 + _seq_ok_count++; + } + + _last_seq = seq; + + //开始排序缓存 + if (_sort_started) { + _rtp_sort_cache_map.emplace(seq, packet); + if (_seq_ok_count >= _clear_sort_size) { + //网络环境改善,需要清空排序缓存 + _seq_ok_count = 0; + _sort_started = false; + while (_rtp_sort_cache_map.size()) { + popPacket(); + } + } else if (_rtp_sort_cache_map.size() >= _max_sort_size) { + //排序缓存溢出 + popPacket(); + } + } else { + //正确序列 + onPacketSorted(seq, packet); + } + } + +private: + void popPacket() { + auto it = _rtp_sort_cache_map.begin(); + onPacketSorted(it->first, it->second); + _rtp_sort_cache_map.erase(it); + } + + void onPacketSorted(SEQ seq, const T &packet) { + _cb(seq, packet); + } + +private: + //是否开始seq排序 + bool _sort_started = false; + //上次seq + SEQ _last_seq = 0; + //seq连续次数计数 + uint32_t _seq_ok_count = 0; + //seq回环次数计数 + uint32_t _seq_cycle_count = 0; + //排序缓存长度 + uint32_t _max_sort_size; + //seq连续次数超过该值后,清空并关闭排序缓存 + uint32_t _clear_sort_size; + //rtp排序缓存,根据seq排序 + map _rtp_sort_cache_map; + //回调 + function _cb; +}; + class RtpReceiver { public: RtpReceiver(); virtual ~RtpReceiver(); -protected: +protected: /** * 输入数据指针生成并排序rtp包 * @param track_index track下标索引 @@ -46,6 +163,7 @@ protected: * @param track_index track索引 */ virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){} + void clear(); void setPoolSize(int size); int getJitterSize(int track_index); @@ -58,16 +176,8 @@ private: uint32_t _ssrc[2] = { 0, 0 }; //ssrc不匹配计数 uint32_t _ssrc_err_count[2] = { 0, 0 }; - //上次seq - uint16_t _last_seq[2] = { 0 , 0 }; - //seq连续次数计数 - uint32_t _seq_ok_count[2] = { 0 , 0}; - //seq回环次数计数 - uint32_t _seq_cycle_count[2] = { 0 , 0}; - //是否开始seq排序 - bool _sort_started[2] = { 0 , 0}; //rtp排序缓存,根据seq排序 - map _rtp_sort_cache_map[2]; + PacketSortor _rtp_sortor[2]; //rtp循环池 RtspMediaSource::PoolType _rtp_pool; }; diff --git a/src/Rtsp/RtspMediaSource.h b/src/Rtsp/RtspMediaSource.h index a67a4b34..341aca5c 100644 --- a/src/Rtsp/RtspMediaSource.h +++ b/src/Rtsp/RtspMediaSource.h @@ -157,6 +157,7 @@ public: * @param keyPos 该包是否为关键帧的第一个包 */ void onWrite(const RtpPacket::Ptr &rtp, bool keyPos) override { + _speed += rtp->size(); assert(rtp->type >= 0 && rtp->type < TrackMax); auto track = _tracks[rtp->type]; if (track) { diff --git a/src/TS/TSMediaSource.h b/src/TS/TSMediaSource.h index c7b05f53..2a5a7080 100644 --- a/src/TS/TSMediaSource.h +++ b/src/TS/TSMediaSource.h @@ -76,6 +76,7 @@ public: * @param key 是否为关键帧第一个包 */ void onWrite(const TSPacket::Ptr &packet, bool key) override { + _speed += packet->size(); if (!_ring) { createRing(); }