diff --git a/3rdpart/media-server b/3rdpart/media-server index 82a48c57..d54285e9 160000 --- a/3rdpart/media-server +++ b/3rdpart/media-server @@ -1 +1 @@ -Subproject commit 82a48c57ae5269aaa70a8c3df8a67dc06b05d0cd +Subproject commit d54285e96260e6d56d68929ec9ace402b83ff6b0 diff --git a/AUTHORS b/AUTHORS index c80c92fb..729a7e4c 100644 --- a/AUTHORS +++ b/AUTHORS @@ -14,3 +14,6 @@ huohuo <913481084@qq.com> [γ瑞γミ](https://github.com/JerryLinGd) [茄子](https://github.com/taotaobujue2008) [好心情](<409257224@qq.com>) +[Xiaofeng Wang](https://github.com/wasphin) +[doodoocoder](https://github.com/doodoocoder) +[qingci](https://github.com/Colibrow) diff --git a/README.md b/README.md index 09eadac5..48fa4489 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ ## 项目特点 - 基于C++11开发,避免使用裸指针,代码稳定可靠,性能优越。 -- 支持多种协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV/GB28181/MP4),支持协议互转。 +- 支持多种协议(RTSP/RTMP/HLS/HTTP-FLV/Websocket-FLV/GB28181/HTTP-TS/Websocket-TS/HTTP-fMP4/Websocket-fMP4/MP4),支持协议互转。 - 使用多路复用/多线程/异步网络IO模式开发,并发性能优越,支持海量客户端连接。 - 代码经过长期大量的稳定性、性能测试,已经在线上商用验证已久。 - 支持linux、macos、ios、android、windows全平台。 @@ -59,6 +59,14 @@ - 通过cookie追踪技术,可以模拟HLS播放为长连接,可以实现HLS按需拉流、播放统计等业务 - 支持HLS播发器,支持拉流HLS转rtsp/rtmp/mp4 - 支持H264/H265/AAC/G711/OPUS编码 + +- TS + - 支持http[s]-ts直播 + - 支持ws[s]-ts直播 + +- fMP4 + - 支持http[s]-fmp4直播 + - 支持ws[s]-fmp4直播 - HTTP[S]与WebSocket - 服务器支持`目录索引生成`,`文件下载`,`表单提交请求` @@ -112,7 +120,7 @@ 你可以从Docker Hub下载已经编译好的镜像并启动它: ```bash -docker run -id -p 1935:1935 -p 8080:80 gemfield/zlmediakit:20.04-runtime-ubuntu18.04 +docker run -id -p 1935:1935 -p 8080:80 -p 8554:554 -p 10000:10000 -p 10000:10000/udp panjjo/zlmediakit ``` 你也可以根据Dockerfile编译镜像: @@ -176,6 +184,12 @@ bash build_docker_images.sh [茄子](https://github.com/taotaobujue2008) [好心情](<409257224@qq.com>) [浮沉](https://github.com/MingZhuLiu) +[Xiaofeng Wang](https://github.com/wasphin) +[doodoocoder](https://github.com/doodoocoder) +[qingci](https://github.com/Colibrow) +[swwheihei](https://github.com/swwheihei) +[KKKKK5G](https://gitee.com/kkkkk5G) + ## 捐赠 diff --git a/README_en.md b/README_en.md index 32d3ea2d..7193f7b2 100644 --- a/README_en.md +++ b/README_en.md @@ -11,7 +11,7 @@ ## Why ZLMediaKit? - Developed based on C++ 11, the code is stable and reliable, avoiding the use of raw pointers, cross-platform porting is simple and convenient, and the code is clear and concise. -- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/Websocket-flv`),and support Inter-protocol conversion. +- Support rich streaming media protocols(`RTSP/RTMP/HLS/HTTP-FLV/WebSocket-flv/HTTP-TS/WebSocket-TS/HTTP-fMP4/Websocket-fMP4/MP4`),and support Inter-protocol conversion. - Multiplexing asynchronous network IO based on epoll and multi thread,extreme performance. - Well performance and stable test,can be used commercially. - Support linux, macos, ios, android, Windows Platforms. @@ -31,7 +31,7 @@ - RTMP[S] - RTMP[S] server,support player and pusher. - RTMP[S] player and pusher. - - Support HTTP-FLV player. + - Support HTTP-FLV/WebSocket-FLV sever. - H265/H264/AAC/G711/OPUS codec. - Recorded as flv or mp4. - Vod of mp4. @@ -41,6 +41,12 @@ - RTSP RTMP can be converted into HLS,built-in HTTP server. - Play authentication based on cookie. - Support HLS player, support streaming HLS proxy to RTSP / RTMP / MP4. + +- TS + - Support HTTP-TS/WebSocket-TS sever. + +- fMP4 + - Support HTTP-fMP4/WebSocket-fMP4 sever. - HTTP[S] - HTTP server,suppor directory meun、RESTful http api. @@ -244,7 +250,7 @@ git submodule update --init ## Docker Image You can pull a pre-built docker image from Docker Hub and run with ```bash -docker run -id -p 1935:1935 -p 8080:80 gemfield/zlmediakit +docker run -id -p 1935:1935 -p 8080:80 -p 8554:554 -p 10000:10000 -p 10000:10000/udp panjjo/zlmediakit ``` Dockerfile is also supplied to build images on Ubuntu 16.04 diff --git a/api/include/mk_events_objects.h b/api/include/mk_events_objects.h index bdcca23c..2aba7ada 100644 --- a/api/include/mk_events_objects.h +++ b/api/include/mk_events_objects.h @@ -19,25 +19,25 @@ extern "C" { ///////////////////////////////////////////MP4Info///////////////////////////////////////////// //MP4Info对象的C映射 typedef void* mk_mp4_info; -//MP4Info::ui64StartedTime +// GMT 标准时间,单位秒 API_EXPORT uint64_t API_CALL mk_mp4_info_get_start_time(const mk_mp4_info ctx); -//MP4Info::ui64TimeLen -API_EXPORT uint64_t API_CALL mk_mp4_info_get_time_len(const mk_mp4_info ctx); -//MP4Info::ui64FileSize +// 录像长度,单位秒 +API_EXPORT float API_CALL mk_mp4_info_get_time_len(const mk_mp4_info ctx); +// 文件大小,单位 BYTE API_EXPORT uint64_t API_CALL mk_mp4_info_get_file_size(const mk_mp4_info ctx); -//MP4Info::strFilePath +// 文件路径 API_EXPORT const char* API_CALL mk_mp4_info_get_file_path(const mk_mp4_info ctx); -//MP4Info::strFileName +// 文件名称 API_EXPORT const char* API_CALL mk_mp4_info_get_file_name(const mk_mp4_info ctx); -//MP4Info::strFolder +// 文件夹路径 API_EXPORT const char* API_CALL mk_mp4_info_get_folder(const mk_mp4_info ctx); -//MP4Info::strUrl +// 播放路径 API_EXPORT const char* API_CALL mk_mp4_info_get_url(const mk_mp4_info ctx); -//MP4Info::strVhost +// 应用名称 API_EXPORT const char* API_CALL mk_mp4_info_get_vhost(const mk_mp4_info ctx); -//MP4Info::strAppName +// 流 ID API_EXPORT const char* API_CALL mk_mp4_info_get_app(const mk_mp4_info ctx); -//MP4Info::strStreamId +// 虚拟主机 API_EXPORT const char* API_CALL mk_mp4_info_get_stream(const mk_mp4_info ctx); ///////////////////////////////////////////Parser///////////////////////////////////////////// diff --git a/api/include/mk_media.h b/api/include/mk_media.h index 8b7b5974..d41f3dc7 100755 --- a/api/include/mk_media.h +++ b/api/include/mk_media.h @@ -26,14 +26,12 @@ typedef void *mk_media; * @param app 应用名,推荐为live * @param stream 流id,例如camera * @param duration 时长(单位秒),直播则为0 - * @param rtsp_enabled 是否启用rtsp协议 - * @param rtmp_enabled 是否启用rtmp协议 * @param hls_enabled 是否生成hls * @param mp4_enabled 是否生成mp4 * @return 对象指针 */ -API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, float duration, - int rtsp_enabled, int rtmp_enabled, int hls_enabled, int mp4_enabled); +API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, + float duration, int hls_enabled, int mp4_enabled); /** * 销毁媒体源 diff --git a/api/include/mk_mediakit.h b/api/include/mk_mediakit.h index dc7c07c7..18cfd589 100755 --- a/api/include/mk_mediakit.h +++ b/api/include/mk_mediakit.h @@ -22,5 +22,6 @@ #include "mk_tcp.h" #include "mk_util.h" #include "mk_thread.h" +#include "mk_rtp_server.h" #endif /* MK_API_H_ */ diff --git a/api/include/mk_rtp_server.h b/api/include/mk_rtp_server.h new file mode 100644 index 00000000..451cf766 --- /dev/null +++ b/api/include/mk_rtp_server.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "mk_common.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef void* mk_rtp_server; + +/** + * 创建GB28181 RTP 服务器 + * @param port 监听端口,0则为随机 + * @param enable_tcp 创建udp端口时是否同时监听tcp端口 + * @param stream_id 该端口绑定的流id + * @return + */ +API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int enable_tcp, const char *stream_id); + +/** + * 销毁GB28181 RTP 服务器 + * @param ctx 服务器对象 + */ +API_EXPORT void API_CALL mk_rtp_server_release(mk_rtp_server ctx); + +/** + * 获取本地监听的端口号 + * @param ctx 服务器对象 + * @return 端口号 + */ +API_EXPORT uint16_t API_CALL mk_rtp_server_port(mk_rtp_server ctx); + +/** + * GB28181 RTP 服务器接收流超时时触发 + * @param user_data 用户数据指针 + */ +typedef void(API_CALL *on_mk_rtp_server_detach)(void *user_data); + +/** + * 监听B28181 RTP 服务器接收流超时事件 + * @param ctx 服务器对象 + * @param cb 回调函数 + * @param user_data 回调函数用户数据指针 + */ +API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rtp_server_detach cb, void *user_data); + + +#ifdef __cplusplus +} +#endif \ No newline at end of file diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 72c30ed2..d078845b 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -18,65 +18,65 @@ #include "Rtsp/RtspSession.h" using namespace mediakit; -///////////////////////////////////////////MP4Info///////////////////////////////////////////// +///////////////////////////////////////////RecordInfo///////////////////////////////////////////// API_EXPORT uint64_t API_CALL mk_mp4_info_get_start_time(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->ui64StartedTime; + RecordInfo *info = (RecordInfo *)ctx; + return info->start_time; } -API_EXPORT uint64_t API_CALL mk_mp4_info_get_time_len(const mk_mp4_info ctx){ +API_EXPORT float API_CALL mk_mp4_info_get_time_len(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->ui64TimeLen; + RecordInfo *info = (RecordInfo *)ctx; + return info->time_len; } API_EXPORT uint64_t API_CALL mk_mp4_info_get_file_size(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->ui64FileSize; + RecordInfo *info = (RecordInfo *)ctx; + return info->file_size; } API_EXPORT const char* API_CALL mk_mp4_info_get_file_path(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->strFilePath.c_str(); + RecordInfo *info = (RecordInfo *)ctx; + return info->file_path.c_str(); } API_EXPORT const char* API_CALL mk_mp4_info_get_file_name(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->strFileName.c_str(); + RecordInfo *info = (RecordInfo *)ctx; + return info->file_name.c_str(); } API_EXPORT const char* API_CALL mk_mp4_info_get_folder(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->strFolder.c_str(); + RecordInfo *info = (RecordInfo *)ctx; + return info->folder.c_str(); } API_EXPORT const char* API_CALL mk_mp4_info_get_url(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->strUrl.c_str(); + RecordInfo *info = (RecordInfo *)ctx; + return info->url.c_str(); } API_EXPORT const char* API_CALL mk_mp4_info_get_vhost(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->strVhost.c_str(); + RecordInfo *info = (RecordInfo *)ctx; + return info->vhost.c_str(); } API_EXPORT const char* API_CALL mk_mp4_info_get_app(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->strAppName.c_str(); + RecordInfo *info = (RecordInfo *)ctx; + return info->app.c_str(); } API_EXPORT const char* API_CALL mk_mp4_info_get_stream(const mk_mp4_info ctx){ assert(ctx); - MP4Info *info = (MP4Info *)ctx; - return info->strStreamId.c_str(); + RecordInfo *info = (RecordInfo *)ctx; + return info->stream.c_str(); } ///////////////////////////////////////////Parser///////////////////////////////////////////// diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 73f7b07b..1ad6ba64 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -117,11 +117,10 @@ API_EXPORT int API_CALL mk_media_total_reader_count(mk_media ctx){ return (*obj)->getChannel()->totalReaderCount(); } -API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, float duration, - int rtsp_enabled, int rtmp_enabled, int hls_enabled, int mp4_enabled) { +API_EXPORT mk_media API_CALL mk_media_create(const char *vhost, const char *app, const char *stream, + float duration, int hls_enabled, int mp4_enabled) { assert(vhost && app && stream); - MediaHelper::Ptr *obj(new MediaHelper::Ptr(new MediaHelper(vhost, app, stream, duration, - rtsp_enabled, rtmp_enabled, hls_enabled, mp4_enabled))); + MediaHelper::Ptr *obj(new MediaHelper::Ptr(new MediaHelper(vhost, app, stream, duration, hls_enabled, mp4_enabled))); (*obj)->attachEvent(); return (mk_media) obj; } diff --git a/api/source/mk_proxyplayer.cpp b/api/source/mk_proxyplayer.cpp index 91e496af..0ae63f1d 100644 --- a/api/source/mk_proxyplayer.cpp +++ b/api/source/mk_proxyplayer.cpp @@ -16,7 +16,7 @@ using namespace mediakit; API_EXPORT mk_proxy_player API_CALL mk_proxy_player_create(const char *vhost, const char *app, const char *stream, int hls_enabled, int mp4_enabled) { assert(vhost && app && stream); - PlayerProxy::Ptr *obj(new PlayerProxy::Ptr(new PlayerProxy(vhost, app, stream, true, true, hls_enabled, mp4_enabled))); + PlayerProxy::Ptr *obj(new PlayerProxy::Ptr(new PlayerProxy(vhost, app, stream, hls_enabled, mp4_enabled))); return (mk_proxy_player) obj; } diff --git a/api/source/mk_rtp_server.cpp b/api/source/mk_rtp_server.cpp new file mode 100644 index 00000000..a36b9363 --- /dev/null +++ b/api/source/mk_rtp_server.cpp @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "mk_rtp_server.h" +#include "Rtp/RtpServer.h" +using namespace mediakit; + +API_EXPORT mk_rtp_server API_CALL mk_rtp_server_create(uint16_t port, int enable_tcp, const char *stream_id){ + RtpServer::Ptr *server = new RtpServer::Ptr(new RtpServer); + (*server)->start(port, stream_id, enable_tcp); + return server; +} + +API_EXPORT void API_CALL mk_rtp_server_release(mk_rtp_server ctx){ + RtpServer::Ptr *server = (RtpServer::Ptr *)ctx; + delete server; +} + +API_EXPORT uint16_t API_CALL mk_rtp_server_port(mk_rtp_server ctx){ + RtpServer::Ptr *server = (RtpServer::Ptr *)ctx; + return (*server)->getPort(); +} + +API_EXPORT void API_CALL mk_rtp_server_set_on_detach(mk_rtp_server ctx, on_mk_rtp_server_detach cb, void *user_data){ + RtpServer::Ptr *server = (RtpServer::Ptr *) ctx; + if (cb) { + (*server)->setOnDetach([cb, user_data]() { + cb(user_data); + }); + } else { + (*server)->setOnDetach(nullptr); + } +} diff --git a/conf/config.ini b/conf/config.ini index b4c54b73..16d301b8 100644 --- a/conf/config.ini +++ b/conf/config.ini @@ -66,6 +66,8 @@ segDur=2 segNum=3 #HLS切片从m3u8文件中移除后,继续保留在磁盘上的个数 segRetain=5 +# 是否广播 ts 切片完成通知 +broadcastRecordTs=0 [hook] #在推流时,如果url参数匹对admin_params,那么可以不经过hook鉴权直接推流成功,播放时亦然 @@ -83,6 +85,8 @@ on_play=https://127.0.0.1/index/hook/on_play on_publish=https://127.0.0.1/index/hook/on_publish #录制mp4切片完成事件 on_record_mp4=https://127.0.0.1/index/hook/on_record_mp4 +# 录制 hls ts 切片完成事件 +on_record_ts=https://127.0.0.1/index/hook/on_record_ts #rtsp播放鉴权事件,此事件中比对rtsp的用户名密码 on_rtsp_auth=https://127.0.0.1/index/hook/on_rtsp_auth #rtsp播放是否开启专属鉴权事件,置空则关闭rtsp鉴权。rtsp播放鉴权还支持url方式鉴权 diff --git a/server/WebApi.cpp b/server/WebApi.cpp index db39fa06..06f9c063 100644 --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -712,8 +712,6 @@ void installWebApi() { const string &app, const string &stream, const string &url, - bool enable_rtsp, - bool enable_rtmp, bool enable_hls, bool enable_mp4, int rtp_type, @@ -726,7 +724,7 @@ void installWebApi() { return; } //添加拉流代理 - PlayerProxy::Ptr player(new PlayerProxy(vhost,app,stream,enable_rtsp,enable_rtmp,enable_hls,enable_mp4)); + PlayerProxy::Ptr player(new PlayerProxy(vhost, app, stream, enable_hls, enable_mp4)); s_proxyMap[key] = player; //指定RTP over TCP(播放rtsp时有效) @@ -772,13 +770,11 @@ void installWebApi() { //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs api_regist2("/index/api/addStreamProxy",[](API_ARGS2){ CHECK_SECRET(); - CHECK_ARGS("vhost","app","stream","url","enable_rtsp","enable_rtmp"); + CHECK_ARGS("vhost","app","stream","url"); addStreamProxy(allArgs["vhost"], allArgs["app"], allArgs["stream"], allArgs["url"], - allArgs["enable_rtsp"],/* 是否rtsp转发 */ - allArgs["enable_rtmp"],/* 是否rtmp转发 */ allArgs["enable_hls"],/* 是否hls转发 */ allArgs["enable_mp4"],/* 是否MP4录制 */ allArgs["rtp_type"], @@ -1207,8 +1203,6 @@ void installWebApi() { allArgs["stream"], /** 支持rtsp和rtmp方式拉流 ,rtsp支持h265/h264/aac,rtmp仅支持h264/aac **/ "rtsp://184.72.239.149/vod/mp4:BigBuckBunny_115k.mov", - true,/* 开启rtsp转发 */ - true,/* 开启rtmp转发 */ true,/* 开启hls转发 */ false,/* 禁用MP4录制 */ 0,//rtp over tcp方式拉流 diff --git a/server/WebHook.cpp b/server/WebHook.cpp index ec82e974..655f9b8f 100644 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -55,6 +55,7 @@ const string kOnRtspAuth = HOOK_FIELD"on_rtsp_auth"; const string kOnStreamChanged = HOOK_FIELD"on_stream_changed"; const string kOnStreamNotFound = HOOK_FIELD"on_stream_not_found"; const string kOnRecordMp4 = HOOK_FIELD"on_record_mp4"; +const string kOnRecordTs = HOOK_FIELD"on_record_ts"; const string kOnShellLogin = HOOK_FIELD"on_shell_login"; const string kOnStreamNoneReader = HOOK_FIELD"on_stream_none_reader"; const string kOnHttpAccess = HOOK_FIELD"on_http_access"; @@ -74,6 +75,7 @@ onceToken token([](){ mINI::Instance()[kOnStreamChanged] = "https://127.0.0.1/index/hook/on_stream_changed"; mINI::Instance()[kOnStreamNotFound] = "https://127.0.0.1/index/hook/on_stream_not_found"; mINI::Instance()[kOnRecordMp4] = "https://127.0.0.1/index/hook/on_record_mp4"; + mINI::Instance()[kOnRecordTs] = "https://127.0.0.1/index/hook/on_record_ts"; mINI::Instance()[kOnShellLogin] = "https://127.0.0.1/index/hook/on_shell_login"; mINI::Instance()[kOnStreamNoneReader] = "https://127.0.0.1/index/hook/on_stream_none_reader"; mINI::Instance()[kOnHttpAccess] = "https://127.0.0.1/index/hook/on_http_access"; @@ -195,6 +197,7 @@ void installWebHook(){ GET_CONFIG(string,hook_stream_chaned,Hook::kOnStreamChanged); GET_CONFIG(string,hook_stream_not_found,Hook::kOnStreamNotFound); GET_CONFIG(string,hook_record_mp4,Hook::kOnRecordMp4); + GET_CONFIG(string,hook_record_ts,Hook::kOnRecordTs); GET_CONFIG(string,hook_shell_login,Hook::kOnShellLogin); GET_CONFIG(string,hook_stream_none_reader,Hook::kOnStreamNoneReader); GET_CONFIG(string,hook_http_access,Hook::kOnHttpAccess); @@ -375,28 +378,40 @@ void installWebHook(){ do_http_hook(hook_stream_not_found,body, nullptr); }); + static auto getRecordInfo = [](const RecordInfo &info) { + ArgsType body; + body["start_time"] = (Json::UInt64) info.start_time; + body["file_size"] = (Json::UInt64) info.file_size; + body["time_len"] = info.time_len; + body["file_path"] = info.file_path; + body["file_name"] = info.file_name; + body["folder"] = info.folder; + body["url"] = info.url; + body["app"] = info.app; + body["stream"] = info.stream; + body["vhost"] = info.vhost; + return body; + }; + #ifdef ENABLE_MP4 //录制mp4文件成功后广播 NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastRecordMP4,[](BroadcastRecordMP4Args){ - if(!hook_enable || hook_record_mp4.empty()){ + if (!hook_enable || hook_record_mp4.empty()) { return; } - ArgsType body; - body["start_time"] = (Json::UInt64)info.ui64StartedTime; - body["time_len"] = (Json::UInt64)info.ui64TimeLen; - body["file_size"] = (Json::UInt64)info.ui64FileSize; - body["file_path"] = info.strFilePath; - body["file_name"] = info.strFileName; - body["folder"] = info.strFolder; - body["url"] = info.strUrl; - body["app"] = info.strAppName; - body["stream"] = info.strStreamId; - body["vhost"] = info.strVhost; //执行hook - do_http_hook(hook_record_mp4,body, nullptr); + do_http_hook(hook_record_mp4, getRecordInfo(info), nullptr); }); #endif //ENABLE_MP4 + NoticeCenter::Instance().addListener(nullptr, Broadcast::kBroadcastRecordTs, [](BroadcastRecordTsArgs) { + if (!hook_enable || hook_record_ts.empty()) { + return; + } + // 执行 hook + do_http_hook(hook_record_ts, getRecordInfo(info), nullptr); + }); + NoticeCenter::Instance().addListener(nullptr,Broadcast::kBroadcastShellLogin,[](BroadcastShellLoginArgs){ if(!hook_enable || hook_shell_login.empty() || sender.get_peer_ip() == "127.0.0.1"){ invoker(""); @@ -435,7 +450,6 @@ void installWebHook(){ } strongSrc->close(false); }); - }); /** diff --git a/src/Common/Device.cpp b/src/Common/Device.cpp index 0231600f..0b08289d 100644 --- a/src/Common/Device.cpp +++ b/src/Common/Device.cpp @@ -20,9 +20,9 @@ using namespace toolkit; namespace mediakit { -DevChannel::DevChannel(const string &vhost, const string &app, const string &stream_id, float duration, - bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4) : - MultiMediaSourceMuxer(vhost, app, stream_id, duration, enable_rtsp, enable_rtmp, enable_hls, enable_mp4) {} +DevChannel::DevChannel(const string &vhost, const string &app, const string &stream_id, + float duration, bool enable_hls, bool enable_mp4) : + MultiMediaSourceMuxer(vhost, app, stream_id, duration, true, true, enable_hls, enable_mp4) {} DevChannel::~DevChannel() {} diff --git a/src/Common/Device.h b/src/Common/Device.h index dbff5424..5f726f79 100644 --- a/src/Common/Device.h +++ b/src/Common/Device.h @@ -53,8 +53,8 @@ class DevChannel : public MultiMediaSourceMuxer{ public: typedef std::shared_ptr Ptr; //fDuration<=0为直播,否则为点播 - DevChannel(const string &vhost, const string &app, const string &stream_id, float duration = 0, - bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false); + DevChannel(const string &vhost, const string &app, const string &stream_id, + float duration = 0, bool enable_hls = true, bool enable_mp4 = false); ~DevChannel() override ; diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index b19b1735..2b7b80bb 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -220,7 +220,7 @@ static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, con } if(!ret && create_new && schema != HLS_SCHEMA){ - //未查找媒体源,则创建一个 + //未查找媒体源,则读取mp4创建一个 //播放hls不触发mp4点播(因为HLS也可以用于录像,不是纯粹的直播) ret = MediaSource::createFromMP4(schema, vhost, app, id); } @@ -364,8 +364,14 @@ bool MediaSource::unregist() { /////////////////////////////////////MediaInfo////////////////////////////////////// -void MediaInfo::parse(const string &url){ - //string url = "rtsp://127.0.0.1:8554/live/id?key=val&a=1&&b=2&vhost=vhost.com"; +void MediaInfo::parse(const string &url_in){ + string url = url_in; + auto pos = url.find("?"); + if (pos != string::npos) { + _param_strs = url.substr(pos + 1); + url.erase(pos); + } + auto schema_pos = url.find("://"); if (schema_pos != string::npos) { _schema = url.substr(0, schema_pos); @@ -382,12 +388,10 @@ void MediaInfo::parse(const string &url){ } else { _host = _vhost = vhost; } - if (_vhost == "localhost" || INADDR_NONE != inet_addr(_vhost.data())) { //如果访问的是localhost或ip,那么则为默认虚拟主机 _vhost = DEFAULT_VHOST; } - } if (split_vec.size() > 1) { _app = split_vec[1]; @@ -400,17 +404,12 @@ void MediaInfo::parse(const string &url){ if (stream_id.back() == '/') { stream_id.pop_back(); } - auto pos = stream_id.find("?"); - if (pos != string::npos) { - _streamid = stream_id.substr(0, pos); - _param_strs = stream_id.substr(pos + 1); - auto params = Parser::parseArgs(_param_strs); - if (params.find(VHOST_KEY) != params.end()) { - _vhost = params[VHOST_KEY]; - } - } else { - _streamid = stream_id; - } + _streamid = stream_id; + } + + auto params = Parser::parseArgs(_param_strs); + if (params.find(VHOST_KEY) != params.end()) { + _vhost = params[VHOST_KEY]; } GET_CONFIG(bool, enableVhost, General::kEnableVhost); diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 2d9ab082..5156474c 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -32,6 +32,9 @@ MultiMuxerPrivate::MultiMuxerPrivate(const string &vhost, const string &app, con if (enable_mp4) { _mp4 = Recorder::createRecorder(Recorder::type_mp4, vhost, app, stream); } + + _ts = std::make_shared(vhost, app, stream); + _fmp4 = std::make_shared(vhost, app, stream); } void MultiMuxerPrivate::resetTracks() { @@ -41,6 +44,12 @@ void MultiMuxerPrivate::resetTracks() { if (_rtsp) { _rtsp->resetTracks(); } + if (_ts) { + _ts->resetTracks(); + } + if (_fmp4) { + _fmp4->resetTracks(); + } //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 auto hls = _hls; @@ -62,6 +71,12 @@ void MultiMuxerPrivate::setMediaListener(const std::weak_ptr & if (_rtsp) { _rtsp->setListener(listener); } + if (_ts) { + _ts->setListener(listener); + } + if (_fmp4) { + _fmp4->setListener(listener); + } auto hls = _hls; if (hls) { hls->setListener(listener); @@ -70,7 +85,11 @@ void MultiMuxerPrivate::setMediaListener(const std::weak_ptr & int MultiMuxerPrivate::totalReaderCount() const { auto hls = _hls; - return (_rtsp ? _rtsp->readerCount() : 0) + (_rtmp ? _rtmp->readerCount() : 0) + (hls ? hls->readerCount() : 0); + return (_rtsp ? _rtsp->readerCount() : 0) + + (_rtmp ? _rtmp->readerCount() : 0) + + (_ts ? _ts->readerCount() : 0) + + (_fmp4 ? _fmp4->readerCount() : 0) + + (hls ? hls->readerCount() : 0); } static std::shared_ptr makeRecorder(const vector &tracks, Recorder::type type, const string &custom_path, MediaSource &sender){ @@ -166,6 +185,12 @@ void MultiMuxerPrivate::onTrackReady(const Track::Ptr &track) { if (_rtsp) { _rtsp->addTrack(track); } + if (_ts) { + _ts->addTrack(track); + } + if (_fmp4) { + _fmp4->addTrack(track); + } //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 auto hls = _hls; @@ -187,6 +212,8 @@ bool MultiMuxerPrivate::isEnabled(){ auto hls = _hls; return (_rtmp ? _rtmp->isEnabled() : false) || (_rtsp ? _rtsp->isEnabled() : false) || + (_ts ? _ts->isEnabled() : false) || + (_fmp4 ? _fmp4->isEnabled() : false) || (hls ? hls->isEnabled() : false) || _mp4; } @@ -197,6 +224,13 @@ void MultiMuxerPrivate::onTrackFrame(const Frame::Ptr &frame) { if (_rtsp) { _rtsp->inputFrame(frame); } + if (_ts) { + _ts->inputFrame(frame); + } + if (_fmp4) { + _fmp4->inputFrame(frame); + } + //拷贝智能指针,目的是为了防止跨线程调用设置录像相关api导致的线程竞争问题 //此处使用智能指针拷贝来确保线程安全,比互斥锁性能更优 auto hls = _hls; @@ -252,6 +286,9 @@ void MultiMuxerPrivate::onAllTrackReady() { if (_rtsp) { _rtsp->onAllTrackReady(); } + if (_fmp4) { + _fmp4->onAllTrackReady(); + } if (_track_listener) { _track_listener->onAllTrackReady(); } diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index da328111..4edaa416 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -18,6 +18,8 @@ #include "Record/HlsMediaSource.h" #include "Rtsp/RtspMediaSourceMuxer.h" #include "Rtmp/RtmpMediaSourceMuxer.h" +#include "TS/TSMediaSourceMuxer.h" +#include "FMP4/FMP4MediaSourceMuxer.h" namespace mediakit{ @@ -57,6 +59,8 @@ private: HlsRecorder::Ptr _hls; MediaSinkInterface::Ptr _mp4; MediaSinkInterface::Ptr _hls_record; + TSMediaSourceMuxer::Ptr _ts; + FMP4MediaSourceMuxer::Ptr _fmp4; std::weak_ptr _listener; }; diff --git a/src/Common/config.cpp b/src/Common/config.cpp index 69e80d8e..11b9ad37 100644 --- a/src/Common/config.cpp +++ b/src/Common/config.cpp @@ -40,6 +40,7 @@ bool loadIniConfig(const char *ini_path){ namespace Broadcast { const string kBroadcastMediaChanged = "kBroadcastMediaChanged"; const string kBroadcastRecordMP4 = "kBroadcastRecordMP4"; +const string kBroadcastRecordTs = "kBroadcastRecoredTs"; const string kBroadcastHttpRequest = "kBroadcastHttpRequest"; const string kBroadcastHttpAccess = "kBroadcastHttpAccess"; const string kBroadcastOnGetRtspRealm = "kBroadcastOnGetRtspRealm"; @@ -253,6 +254,8 @@ const string kSegmentRetain = HLS_FIELD"segRetain"; const string kFileBufSize = HLS_FIELD"fileBufSize"; //录制文件路径 const string kFilePath = HLS_FIELD"filePath"; +// 是否广播 ts 切片完成通知 +const string kBroadcastRecordTs = HLS_FIELD"broadcastRecordTs"; onceToken token([](){ mINI::Instance()[kSegmentDuration] = 2; @@ -260,6 +263,7 @@ onceToken token([](){ mINI::Instance()[kSegmentRetain] = 5; mINI::Instance()[kFileBufSize] = 64 * 1024; mINI::Instance()[kFilePath] = "./www"; + mINI::Instance()[kBroadcastRecordTs] = false; },nullptr); } //namespace Hls diff --git a/src/Common/config.h b/src/Common/config.h index d374c53d..f640851b 100644 --- a/src/Common/config.h +++ b/src/Common/config.h @@ -47,6 +47,8 @@ bool loadIniConfig(const char *ini_path = nullptr); #define RTSP_SCHEMA "rtsp" #define RTMP_SCHEMA "rtmp" #define HLS_SCHEMA "hls" +#define TS_SCHEMA "ts" +#define FMP4_SCHEMA "fmp4" #define DEFAULT_VHOST "__defaultVhost__" ////////////广播名称/////////// @@ -58,7 +60,11 @@ extern const string kBroadcastMediaChanged; //录制mp4文件成功后广播 extern const string kBroadcastRecordMP4; -#define BroadcastRecordMP4Args const MP4Info &info +#define BroadcastRecordMP4Args const RecordInfo &info + +// 录制 ts 文件后广播 +extern const string kBroadcastRecordTs; +#define BroadcastRecordTsArgs const RecordInfo &info //录制hls文件成功后广播 extern const string kBroadcastRecordHls; @@ -289,6 +295,8 @@ extern const string kSegmentRetain; extern const string kFileBufSize; //录制文件路径 extern const string kFilePath; +// 是否广播 ts 切片完成通知 +extern const string kBroadcastRecordTs; } //namespace Hls ////////////Rtp代理相关配置/////////// diff --git a/src/FMP4/FMP4MediaSource.h b/src/FMP4/FMP4MediaSource.h new file mode 100644 index 00000000..277d4bf8 --- /dev/null +++ b/src/FMP4/FMP4MediaSource.h @@ -0,0 +1,148 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_FMP4MEDIASOURCE_H +#define ZLMEDIAKIT_FMP4MEDIASOURCE_H + +#include "Common/MediaSource.h" +using namespace toolkit; +#define FMP4_GOP_SIZE 512 + +namespace mediakit { + +//FMP4直播数据包 +class FMP4Packet : public BufferString{ +public: + using Ptr = std::shared_ptr; + + template + FMP4Packet(ARGS && ...args) : BufferString(std::forward(args)...) {}; + ~FMP4Packet() override = default; + +public: + uint32_t time_stamp = 0; +}; + +//FMP4直播合并写策略类 +class FMP4FlushPolicy : public FlushPolicy{ +public: + FMP4FlushPolicy() = default; + ~FMP4FlushPolicy() = default; + + uint32_t getStamp(const FMP4Packet::Ptr &packet) { + return packet->time_stamp; + } +}; + +//FMP4直播源 +class FMP4MediaSource : public MediaSource, public RingDelegate, public PacketCache{ +public: + using Ptr = std::shared_ptr; + using RingDataType = std::shared_ptr >; + using RingType = RingBuffer; + + FMP4MediaSource(const string &vhost, + const string &app, + const string &stream_id, + int ring_size = FMP4_GOP_SIZE) : MediaSource(FMP4_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} + + ~FMP4MediaSource() override = default; + + /** + * 获取媒体源的环形缓冲 + */ + const RingType::Ptr &getRing() const { + return _ring; + } + + /** + * 获取fmp4 init segment + */ + const string &getInitSegment() const{ + return _init_segment; + } + + /** + * 设置fmp4 init segment + * @param str init segment + */ + void setInitSegment(string str) { + _init_segment = std::move(str); + if (_ring) { + regist(); + } + } + + /** + * 获取播放器个数 + */ + int readerCount() override { + return _ring ? _ring->readerCount() : 0; + } + + /** + * 输入FMP4包 + * @param packet FMP4包 + * @param key 是否为关键帧第一个包 + */ + void onWrite(const FMP4Packet::Ptr &packet, bool key) override { + if (!_ring) { + createRing(); + } + if (key) { + _have_video = true; + } + PacketCache::inputPacket(true, packet, key); + } + + /** + * 情况GOP缓存 + */ + void clearCache() override { + PacketCache::clearCache(); + _ring->clearCache(); + } + +private: + void createRing(){ + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _ring = std::make_shared(_ring_size, [weak_self](int size) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + strong_self->onReaderChanged(size); + }); + onReaderChanged(0); + if (!_init_segment.empty()) { + regist(); + } + } + + /** + * 合并写回调 + * @param packet_list 合并写缓存列队 + * @param key_pos 是否包含关键帧 + */ + void onFlush(std::shared_ptr > &packet_list, bool key_pos) override { + //如果不存在视频,那么就没有存在GOP缓存的意义,所以确保一直清空GOP缓存 + _ring->write(packet_list, _have_video ? key_pos : true); + } + +private: + bool _have_video = false; + int _ring_size; + string _init_segment; + RingType::Ptr _ring; +}; + + +}//namespace mediakit +#endif //ZLMEDIAKIT_FMP4MEDIASOURCE_H diff --git a/src/FMP4/FMP4MediaSourceMuxer.h b/src/FMP4/FMP4MediaSourceMuxer.h new file mode 100644 index 00000000..6922eef8 --- /dev/null +++ b/src/FMP4/FMP4MediaSourceMuxer.h @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_FMP4MEDIASOURCEMUXER_H +#define ZLMEDIAKIT_FMP4MEDIASOURCEMUXER_H + +#include "FMP4MediaSource.h" +#include "Record/MP4Muxer.h" + +namespace mediakit { + +class FMP4MediaSourceMuxer : public MP4MuxerMemory, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { +public: + using Ptr = std::shared_ptr; + + FMP4MediaSourceMuxer(const string &vhost, + const string &app, + const string &stream_id) { + _media_src = std::make_shared(vhost, app, stream_id); + } + + ~FMP4MediaSourceMuxer() override = default; + + void setListener(const std::weak_ptr &listener){ + _listener = listener; + _media_src->setListener(shared_from_this()); + } + + int readerCount() const{ + return _media_src->readerCount(); + } + + void onReaderChanged(MediaSource &sender, int size) override { + _enabled = size; + if (!size) { + _clear_cache = true; + } + MediaSourceEventInterceptor::onReaderChanged(sender, size); + } + + void inputFrame(const Frame::Ptr &frame) override { + if (_clear_cache) { + _clear_cache = false; + _media_src->clearCache(); + } + if (_enabled) { + MP4MuxerMemory::inputFrame(frame); + } + } + + bool isEnabled() { + //缓存尚未清空时,还允许触发inputFrame函数,以便及时清空缓存 + return _clear_cache ? true : _enabled; + } + + void onAllTrackReady() { + _media_src->setInitSegment(getInitSegment()); + } + +protected: + void onSegmentData(const string &string, uint32_t stamp, bool key_frame) override { + if (string.empty()) { + return; + } + FMP4Packet::Ptr packet = std::make_shared(std::move(string)); + packet->time_stamp = stamp; + _media_src->onWrite(packet, key_frame); + } + +private: + bool _enabled = true; + bool _clear_cache = false; + FMP4MediaSource::Ptr _media_src; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_FMP4MEDIASOURCEMUXER_H diff --git a/src/Http/HttpFileManager.cpp b/src/Http/HttpFileManager.cpp index 28fca855..f7c2b5bd 100644 --- a/src/Http/HttpFileManager.cpp +++ b/src/Http/HttpFileManager.cpp @@ -27,7 +27,7 @@ static int kHlsCookieSecond = 60; static const string kCookieName = "ZL_COOKIE"; static const string kHlsSuffix = "/hls.m3u8"; -class HttpCookieAttachment{ +class HttpCookieAttachment { public: HttpCookieAttachment() {}; ~HttpCookieAttachment() {}; @@ -160,7 +160,7 @@ const string &HttpFileManager::getContentType(const char *name) { dot = strrchr(name, '.'); static StrCaseMap mapType; static onceToken token([&]() { - for (unsigned int i = 0; i < sizeof (s_mime_src) / sizeof (s_mime_src[0]); ++i) { + for (unsigned int i = 0; i < sizeof(s_mime_src) / sizeof(s_mime_src[0]); ++i) { mapType.emplace(s_mime_src[i][0], s_mime_src[i][1]); } }); @@ -183,8 +183,8 @@ static string searchIndexFile(const string &dir){ } set setFile; while ((pDirent = readdir(pDir)) != NULL) { - static set indexSet = {"index.html","index.htm","index"}; - if(indexSet.find(pDirent->d_name) != indexSet.end()){ + static set indexSet = {"index.html", "index.htm", "index"}; + if (indexSet.find(pDirent->d_name) != indexSet.end()) { string ret = pDirent->d_name; closedir(pDir); return ret; @@ -196,16 +196,16 @@ static string searchIndexFile(const string &dir){ static bool makeFolderMenu(const string &httpPath, const string &strFullPath, string &strRet) { GET_CONFIG(bool, dirMenu, Http::kDirMenu); - if(!dirMenu){ + if (!dirMenu) { //不允许浏览文件夹 return false; } string strPathPrefix(strFullPath); string last_dir_name; - if(strPathPrefix.back() == '/'){ + if (strPathPrefix.back() == '/') { strPathPrefix.pop_back(); - }else{ - last_dir_name = split(strPathPrefix,"/").back(); + } else { + last_dir_name = split(strPathPrefix, "/").back(); } if (!File::is_dir(strPathPrefix.data())) { @@ -249,24 +249,24 @@ static bool makeFolderMenu(const string &httpPath, const string &strFullPath, st if (File::is_special_dir(pDirent->d_name)) { continue; } - if(pDirent->d_name[0] == '.'){ + if (pDirent->d_name[0] == '.') { continue; } setFile.emplace(pDirent->d_name); } int i = 0; - for(auto &strFile :setFile ){ + for (auto &strFile :setFile) { string strAbsolutePath = strPathPrefix + "/" + strFile; bool isDir = File::is_dir(strAbsolutePath.data()); ss << "
  • " << i++ << "\t"; ss << ""; @@ -307,14 +307,14 @@ static bool end_of(const string &str, const string &substr){ //拦截hls的播放请求 static bool emitHlsPlayed(const Parser &parser, const MediaInfo &mediaInfo, const HttpSession::HttpAccessPathInvoker &invoker,TcpSession &sender){ //访问的hls.m3u8结尾,我们转换成kBroadcastMediaPlayed事件 - Broadcast::AuthInvoker mediaAuthInvoker = [invoker](const string &err){ + Broadcast::AuthInvoker auth_invoker = [invoker](const string &err) { //cookie有效期为kHlsCookieSecond - invoker(err,"",kHlsCookieSecond); + invoker(err, "", kHlsCookieSecond); }; - bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,mediaInfo,mediaAuthInvoker,static_cast(sender)); - if(!flag){ + bool flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed, mediaInfo, auth_invoker, static_cast(sender)); + if (!flag) { //未开启鉴权,那么允许播放 - mediaAuthInvoker(""); + auth_invoker(""); } return flag; } @@ -325,23 +325,23 @@ public: SockInfoImp() = default; ~SockInfoImp() override = default; - string get_local_ip() override{ + string get_local_ip() override { return _local_ip; } - uint16_t get_local_port() override{ + uint16_t get_local_port() override { return _local_port; } - string get_peer_ip() override{ + string get_peer_ip() override { return _peer_ip; } - uint16_t get_peer_port() override{ + uint16_t get_peer_port() override { return _peer_port; } - string getIdentifier() const override{ + string getIdentifier() const override { return _identifier; } @@ -384,7 +384,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI //上次cookie是限定本目录 if (attachment._err_msg.empty()) { //上次鉴权成功 - if(attachment._is_hls){ + if (attachment._is_hls) { //如果播放的是hls,那么刷新hls的cookie(获取ts文件也会刷新) cookie->updateTime(); cookie_from_header = false; @@ -434,7 +434,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI attachment._err_msg = errMsg; //记录访问的是否为hls attachment._is_hls = is_hls; - if(is_hls){ + if (is_hls) { //hls相关信息 attachment._hls_data = std::make_shared(mediaInfo, info); //hls未查找MediaSource @@ -442,7 +442,7 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI } (*cookie)[kCookieName].set(std::move(attachment)); callback(errMsg, cookie); - }else{ + } else { callback(errMsg, nullptr); } }; @@ -465,15 +465,15 @@ static void canAccessPath(TcpSession &sender, const Parser &parser, const MediaI * 发送404 Not Found */ static void sendNotFound(const HttpFileManager::invoker &cb) { - GET_CONFIG(string,notFound,Http::kNotFound); - cb("404 Not Found","text/html",StrCaseMap(),std::make_shared(notFound)); + GET_CONFIG(string, notFound, Http::kNotFound); + cb("404 Not Found", "text/html", StrCaseMap(), std::make_shared(notFound)); } /** * 拼接文件路径 */ static string pathCat(const string &a, const string &b){ - if(a.back() == '/'){ + if (a.back() == '/') { return a + b; } return a + '/' + b; @@ -496,7 +496,7 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo return; } - if(is_hls){ + if (is_hls) { //hls,那么移除掉后缀获取真实的stream_id并且修改协议为HLS const_cast(mediaInfo._schema) = HLS_SCHEMA; replace(const_cast(mediaInfo._streamid), kHlsSuffix, ""); @@ -506,7 +506,7 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo //判断是否有权限访问该文件 canAccessPath(sender, parser, mediaInfo, false, [cb, strFile, parser, is_hls, mediaInfo, weakSession , file_exist](const string &errMsg, const HttpServerCookie::Ptr &cookie) { auto strongSession = weakSession.lock(); - if(!strongSession){ + if (!strongSession) { //http客户端已经断开,不需要回复 return; } @@ -514,6 +514,7 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo //文件鉴权失败 StrCaseMap headerOut; if (cookie) { + auto lck = cookie->getLock(); headerOut["Set-Cookie"] = cookie->getCookie((*cookie)[kCookieName].get()._path); } cb("401 Unauthorized", "text/html", headerOut, std::make_shared(errMsg)); @@ -523,11 +524,12 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo auto response_file = [file_exist](const HttpServerCookie::Ptr &cookie, const HttpFileManager::invoker &cb, const string &strFile, const Parser &parser) { StrCaseMap httpHeader; if (cookie) { + auto lck = cookie->getLock(); httpHeader["Set-Cookie"] = cookie->getCookie((*cookie)[kCookieName].get()._path); } HttpSession::HttpResponseInvoker invoker = [&](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) { if (cookie && file_exist) { - cookie->getLock(); + auto lck = cookie->getLock(); auto is_hls = (*cookie)[kCookieName].get()._is_hls; if (is_hls) { (*cookie)[kCookieName].get()._hls_data->addByteUsage(body->remainSize()); @@ -541,44 +543,47 @@ static void accessFile(TcpSession &sender, const Parser &parser, const MediaInfo if (!is_hls) { //不是hls,直接回复文件或404 response_file(cookie, cb, strFile, parser); - } else { - //是hls直播,判断是否存在 - bool have_find_media_src = false; - if(cookie){ - have_find_media_src = (*cookie)[kCookieName].get()._have_find_media_source; - if(!have_find_media_src){ - (*cookie)[kCookieName].get()._have_find_media_source = true; - } + return; + } + + //是hls直播,判断HLS直播流是否已经注册 + bool have_find_media_src = false; + if (cookie) { + auto lck = cookie->getLock(); + have_find_media_src = (*cookie)[kCookieName].get()._have_find_media_source; + if (!have_find_media_src) { + (*cookie)[kCookieName].get()._have_find_media_source = true; } - if(have_find_media_src){ - //之前该cookie已经通过MediaSource::findAsync查找过了,所以现在只以文件系统查找结果为准 + } + if (have_find_media_src) { + //之前该cookie已经通过MediaSource::findAsync查找过了,所以现在只以文件系统查找结果为准 + response_file(cookie, cb, strFile, parser); + return; + } + //hls文件不存在,我们等待其生成并延后回复 + MediaSource::findAsync(mediaInfo, strongSession, [response_file, cookie, cb, strFile, parser](const MediaSource::Ptr &src) { + if (cookie) { + auto lck = cookie->getLock(); + //尝试添加HlsMediaSource的观看人数(HLS是按需生成的,这样可以触发HLS文件的生成) + (*cookie)[kCookieName].get()._hls_data->addByteUsage(0); + } + if (src && File::is_file(strFile.data())) { + //流和m3u8文件都存在,那么直接返回文件 + response_file(cookie, cb, strFile, parser); + return; + } + auto hls = dynamic_pointer_cast(src); + if (!hls) { + //流不存在,那么直接返回文件(相当于纯粹的HLS文件服务器,但是会挂起播放器15秒左右(用于等待HLS流的注册)) response_file(cookie, cb, strFile, parser); return; } - //hls文件不存在,我们等待其生成并延后回复 - MediaSource::findAsync(mediaInfo, strongSession, [response_file, cookie, cb, strFile, parser](const MediaSource::Ptr &src) { - if(cookie){ - //尝试添加HlsMediaSource的观看人数 - (*cookie)[kCookieName].get()._hls_data->addByteUsage(0); - } - if (src && File::is_file(strFile.data())) { - //流和m3u8文件都存在,那么直接返回文件 - response_file(cookie, cb, strFile, parser); - return; - } - auto hls = dynamic_pointer_cast(src); - if (!hls) { - //流不存在,那么直接返回文件 - response_file(cookie, cb, strFile, parser); - return; - } - //流存在,但是m3u8文件不存在,那么等待生成m3u8文件 - hls->waitForFile([response_file, cookie, cb, strFile, parser]() { - response_file(cookie, cb, strFile, parser); - }); + //流存在,但是m3u8文件不存在,那么等待生成m3u8文件(HLS源注册后,并不会立即生成HLS文件,有人观看才会按需生成HLS文件) + hls->waitForFile([response_file, cookie, cb, strFile, parser]() { + response_file(cookie, cb, strFile, parser); }); - } + }); }); } @@ -639,13 +644,13 @@ void HttpFileManager::onAccessPath(TcpSession &sender, Parser &parser, const Htt ////////////////////////////////////HttpResponseInvokerImp////////////////////////////////////// void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) const{ - if(_lambad){ - _lambad(codeOut,headerOut,body); + if (_lambad) { + _lambad(codeOut, headerOut, body); } } void HttpResponseInvokerImp::operator()(const string &codeOut, const StrCaseMap &headerOut, const string &body) const{ - this->operator()(codeOut,headerOut,std::make_shared(body)); + this->operator()(codeOut, headerOut, std::make_shared(body)); } HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda0 &lambda){ @@ -653,23 +658,23 @@ HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::Htt } HttpResponseInvokerImp::HttpResponseInvokerImp(const HttpResponseInvokerImp::HttpResponseInvokerLambda1 &lambda){ - if(!lambda){ + if (!lambda) { _lambad = nullptr; return; } - _lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body){ + _lambad = [lambda](const string &codeOut, const StrCaseMap &headerOut, const HttpBody::Ptr &body) { string str; - if(body && body->remainSize()){ + if (body && body->remainSize()) { str = body->readData(body->remainSize())->toString(); } - lambda(codeOut,headerOut,str); + lambda(codeOut, headerOut, str); }; } void HttpResponseInvokerImp::responseFile(const StrCaseMap &requestHeader, const StrCaseMap &responseHeader, const string &filePath) const { - StrCaseMap &httpHeader = const_cast(responseHeader); + StrCaseMap &httpHeader = const_cast(responseHeader); std::shared_ptr fp(fopen(filePath.data(), "rb"), [](FILE *fp) { if (fp) { fclose(fp); @@ -678,8 +683,8 @@ void HttpResponseInvokerImp::responseFile(const StrCaseMap &requestHeader, if (!fp) { //打开文件失败 - GET_CONFIG(string,notFound,Http::kNotFound); - GET_CONFIG(string,charSet,Http::kCharSet); + GET_CONFIG(string, notFound, Http::kNotFound); + GET_CONFIG(string, charSet, Http::kCharSet); auto strContentType = StrPrinter << "text/html; charset=" << charSet << endl; httpHeader["Content-Type"] = strContentType; @@ -689,14 +694,14 @@ void HttpResponseInvokerImp::responseFile(const StrCaseMap &requestHeader, auto &strRange = const_cast(requestHeader)["Range"]; int64_t iRangeStart = 0; - int64_t iRangeEnd = 0 ; + int64_t iRangeEnd = 0; int64_t fileSize = HttpMultiFormBody::fileSize(fp.get()); const char *pcHttpResult = NULL; if (strRange.size() == 0) { //全部下载 pcHttpResult = "200 OK"; - iRangeEnd = fileSize - 1; + iRangeEnd = fileSize - 1; } else { //分节下载 pcHttpResult = "206 Partial Content"; diff --git a/src/Http/HttpSession.cpp b/src/Http/HttpSession.cpp index 3f173b05..ba996d93 100644 --- a/src/Http/HttpSession.cpp +++ b/src/Http/HttpSession.cpp @@ -8,15 +8,9 @@ * may be found in the AUTHORS file in the root of the source tree. */ -#if !defined(_WIN32) -#include -#endif //!defined(_WIN32) - #include #include #include -#include - #include "Common/config.h" #include "strCoding.h" #include "HttpSession.h" @@ -96,10 +90,10 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) { } void HttpSession::onError(const SockException& err) { - if(_is_flv_stream){ + if(_is_live_stream){ uint64_t duration = _ticker.createdTime()/1000; - //flv播放器 - WarnP(this) << "FLV播放器(" + //flv/ts播放器 + WarnP(this) << "FLV/TS/FMP4播放器(" << _mediaInfo._vhost << "/" << _mediaInfo._app << "/" << _mediaInfo._streamid @@ -107,8 +101,8 @@ void HttpSession::onError(const SockException& err) { << ",耗时(s):" << duration; GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold); - if(_ui64TotalBytes > iFlowThreshold * 1024){ - NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, static_cast(*this)); + if(_total_bytes_usage > iFlowThreshold * 1024){ + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, duration , true, static_cast(*this)); } return; } @@ -135,8 +129,7 @@ bool HttpSession::checkWebSocket(){ if (Sec_WebSocket_Key.empty()) { return false; } - auto Sec_WebSocket_Accept = encodeBase64( - SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); + auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11")); KeyValue headerOut; headerOut["Upgrade"] = "websocket"; @@ -147,17 +140,29 @@ bool HttpSession::checkWebSocket(){ } auto res_cb = [this, headerOut]() { - _flv_over_websocket = true; + _live_over_websocket = true; sendResponse("101 Switching Protocols", false, nullptr, headerOut, nullptr, true); }; //判断是否为websocket-flv - if (checkLiveFlvStream(res_cb)) { + if (checkLiveStreamFlv(res_cb)) { //这里是websocket-flv直播请求 return true; } - //如果checkLiveFlvStream返回false,则代表不是websocket-flv,而是普通的websocket连接 + //判断是否为websocket-ts + if (checkLiveStreamTS(res_cb)) { + //这里是websocket-ts直播请求 + return true; + } + + //判断是否为websocket-fmp4 + if (checkLiveStreamFMP4(res_cb)) { + //这里是websocket-fmp4直播请求 + return true; + } + + //这是普通的websocket连接 if (!onWebSocketConnect(_parser)) { sendResponse("501 Not Implemented", true, nullptr, headerOut); return true; @@ -166,75 +171,59 @@ bool HttpSession::checkWebSocket(){ return true; } -//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 -//如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。 -bool HttpSession::checkLiveFlvStream(const function &cb){ - auto pos = strrchr(_parser.Url().data(),'.'); - if(!pos){ - //未找到".flv"后缀 - return false; - } - if(strcasecmp(pos,".flv") != 0){ - //未找到".flv"后缀 +bool HttpSession::checkLiveStream(const string &schema, const string &url_suffix, const function &cb){ + auto pos = strcasestr(_parser.Url().data(), url_suffix.data()); + if (!pos || pos + url_suffix.size() != 1 + &_parser.Url().back()) { + //未找到后缀 return false; } - //这是个.flv的流 - _mediaInfo.parse(string(RTMP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl()); - if(_mediaInfo._app.empty() || _mediaInfo._streamid.size() < 5){ + //这是个符合后缀的直播的流 + _mediaInfo.parse(schema + "://" + _parser["Host"] + _parser.FullUrl()); + if (_mediaInfo._app.empty() || _mediaInfo._streamid.size() < url_suffix.size() + 1) { //url不合法 return false; } - _mediaInfo._streamid.erase(_mediaInfo._streamid.size() - 4);//去除.flv后缀 - bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); - - weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); + //去除后缀 + bool close_flag = !strcasecmp(_parser["Connection"].data(), "close"); + //流id去除后缀 + _mediaInfo._streamid.erase(_mediaInfo._streamid.size() - url_suffix.size()); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); //鉴权结果回调 - auto onRes = [cb, weakSelf, bClose](const string &err){ - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + auto onRes = [cb, weak_self, close_flag](const string &err) { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } - if(!err.empty()){ + if (!err.empty()) { //播放鉴权失败 - strongSelf->sendResponse("401 Unauthorized", bClose, nullptr, KeyValue(), std::make_shared(err)); + strong_self->sendResponse("401 Unauthorized", close_flag, nullptr, KeyValue(), std::make_shared(err)); return; } - //异步查找rtmp流 - MediaSource::findAsync(strongSelf->_mediaInfo, strongSelf, [weakSelf, bClose, cb](const MediaSource::Ptr &src) { - auto strongSelf = weakSelf.lock(); - if (!strongSelf) { + //异步查找直播流 + MediaSource::findAsync(strong_self->_mediaInfo, strong_self, [weak_self, close_flag, cb](const MediaSource::Ptr &src) { + auto strong_self = weak_self.lock(); + if (!strong_self) { //本对象已经销毁 return; } - auto rtmp_src = dynamic_pointer_cast(src); - if (!rtmp_src) { + if (!src) { //未找到该流 - strongSelf->sendNotFound(bClose); + strong_self->sendNotFound(close_flag); return; } - - if (!cb) { - //找到rtmp源,发送http头,负载后续发送 - strongSelf->sendResponse("200 OK", false, "video/x-flv", KeyValue(), nullptr, true); - } else { - //自定义发送http头 - cb(); - } - - //http-flv直播牺牲延时提升发送性能 - strongSelf->setSocketFlags(); - strongSelf->start(strongSelf->getPoller(), rtmp_src); - strongSelf->_is_flv_stream = true; + strong_self->_is_live_stream = true; + //触发回调 + cb(src); }); }; - Broadcast::AuthInvoker invoker = [weakSelf, onRes](const string &err) { - auto strongSelf = weakSelf.lock(); + Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) { + auto strongSelf = weak_self.lock(); if (!strongSelf) { return; } @@ -251,34 +240,142 @@ bool HttpSession::checkLiveFlvStream(const function &cb){ return true; } +//http-fmp4 链接格式:http://vhost-url:port/app/streamid.live.mp4?key1=value1&key2=value2 +bool HttpSession::checkLiveStreamFMP4(const function &cb){ + return checkLiveStream(FMP4_SCHEMA, ".live.mp4", [this, cb](const MediaSource::Ptr &src) { + auto fmp4_src = dynamic_pointer_cast(src); + assert(fmp4_src); + if (!cb) { + //找到源,发送http头,负载后续发送 + sendResponse("200 OK", false, HttpFileManager::getContentType(".mp4").data(), KeyValue(), nullptr, true); + } else { + //自定义发送http头 + cb(); + } + + //直播牺牲延时提升发送性能 + setSocketFlags(); + onWrite(std::make_shared(fmp4_src->getInitSegment()), true); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _fmp4_reader = fmp4_src->getRing()->attach(getPoller()); + _fmp4_reader->setDetachCB([weak_self]() { + auto strong_self = weak_self.lock(); + if (!strong_self) { + //本对象已经销毁 + return; + } + strong_self->shutdown(SockException(Err_shutdown, "fmp4 ring buffer detached")); + }); + _fmp4_reader->setReadCB([weak_self](const FMP4MediaSource::RingDataType &fmp4_list) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + //本对象已经销毁 + return; + } + int i = 0; + int size = fmp4_list->size(); + fmp4_list->for_each([&](const FMP4Packet::Ptr &ts) { + strong_self->onWrite(ts, ++i == size); + }); + }); + }); +} + +//http-ts 链接格式:http://vhost-url:port/app/streamid.live.ts?key1=value1&key2=value2 +bool HttpSession::checkLiveStreamTS(const function &cb){ + return checkLiveStream(TS_SCHEMA, ".live.ts", [this, cb](const MediaSource::Ptr &src) { + auto ts_src = dynamic_pointer_cast(src); + assert(ts_src); + if (!cb) { + //找到源,发送http头,负载后续发送 + sendResponse("200 OK", false, HttpFileManager::getContentType(".ts").data(), KeyValue(), nullptr, true); + } else { + //自定义发送http头 + cb(); + } + + //直播牺牲延时提升发送性能 + setSocketFlags(); + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _ts_reader = ts_src->getRing()->attach(getPoller()); + _ts_reader->setDetachCB([weak_self](){ + auto strong_self = weak_self.lock(); + if (!strong_self) { + //本对象已经销毁 + return; + } + strong_self->shutdown(SockException(Err_shutdown,"ts ring buffer detached")); + }); + _ts_reader->setReadCB([weak_self](const TSMediaSource::RingDataType &ts_list) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + //本对象已经销毁 + return; + } + int i = 0; + int size = ts_list->size(); + ts_list->for_each([&](const TSPacket::Ptr &ts) { + strong_self->onWrite(ts, ++i == size); + }); + }); + }); +} + +//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2 +bool HttpSession::checkLiveStreamFlv(const function &cb){ + return checkLiveStream(RTMP_SCHEMA, ".flv", [this, cb](const MediaSource::Ptr &src) { + auto rtmp_src = dynamic_pointer_cast(src); + assert(rtmp_src); + if (!cb) { + //找到源,发送http头,负载后续发送 + sendResponse("200 OK", false, HttpFileManager::getContentType(".flv").data(), KeyValue(), nullptr, true); + } else { + //自定义发送http头 + cb(); + } + //直播牺牲延时提升发送性能 + setSocketFlags(); + start(getPoller(), rtmp_src); + }); +} + void HttpSession::Handle_Req_GET(int64_t &content_len) { Handle_Req_GET_l(content_len, true); } void HttpSession::Handle_Req_GET_l(int64_t &content_len, bool sendBody) { //先看看是否为WebSocket请求 - if(checkWebSocket()){ + if (checkWebSocket()) { content_len = -1; - _contentCallBack = [this](const char *data,uint64_t len){ - WebSocketSplitter::decode((uint8_t *)data,len); + _contentCallBack = [this](const char *data, uint64_t len) { + WebSocketSplitter::decode((uint8_t *) data, len); //_contentCallBack是可持续的,后面还要处理后续数据 return true; }; return; } - if(emitHttpEvent(false)){ + if (emitHttpEvent(false)) { //拦截http api事件 return; } - if(checkLiveFlvStream()){ + if (checkLiveStreamFlv()) { //拦截http-flv播放器 return; } - bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); + if (checkLiveStreamTS()) { + //拦截http-ts播放器 + return; + } + if (checkLiveStreamFMP4()) { + //拦截http-fmp4播放器 + return; + } + + bool bClose = !strcasecmp(_parser["Connection"].data(),"close"); weak_ptr weakSelf = dynamic_pointer_cast(shared_from_this()); HttpFileManager::onAccessPath(*this, _parser, [weakSelf, bClose](const string &status_code, const string &content_type, const StrCaseMap &responseHeader, const HttpBody::Ptr &body) { @@ -623,26 +720,26 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) { } _ticker.resetTime(); - if(!_flv_over_websocket){ - _ui64TotalBytes += buffer->size(); + if (!_live_over_websocket) { + _total_bytes_usage += buffer->size(); send(buffer); - }else{ + } else { WebSocketHeader header; header._fin = true; header._reserved = 0; header._opcode = WebSocketHeader::BINARY; header._mask_flag = false; - WebSocketSplitter::encode(header,buffer); + WebSocketSplitter::encode(header, buffer); } - if(flush){ + if (flush) { //本次刷新缓存后,下次不用刷新缓存 HttpSession::setSendFlushFlag(false); } } void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){ - _ui64TotalBytes += buffer->size(); + _total_bytes_usage += buffer->size(); send(buffer); } diff --git a/src/Http/HttpSession.h b/src/Http/HttpSession.h index b842457b..f1df5a8d 100644 --- a/src/Http/HttpSession.h +++ b/src/Http/HttpSession.h @@ -19,6 +19,8 @@ #include "WebSocketSplitter.h" #include "HttpCookieManager.h" #include "HttpFileManager.h" +#include "TS/TSMediaSource.h" +#include "FMP4/FMP4MediaSource.h" using namespace std; using namespace toolkit; @@ -104,7 +106,12 @@ private: void Handle_Req_POST(int64_t &content_len); void Handle_Req_HEAD(int64_t &content_len); - bool checkLiveFlvStream(const function &cb = nullptr); + bool checkLiveStream(const string &schema, const string &url_suffix, const function &cb); + + bool checkLiveStreamFlv(const function &cb = nullptr); + bool checkLiveStreamTS(const function &cb = nullptr); + bool checkLiveStreamFMP4(const function &fmp4_list = nullptr); + bool checkWebSocket(); bool emitHttpEvent(bool doInvoke); void urlDecode(Parser &parser); @@ -117,17 +124,18 @@ private: void setSocketFlags(); private: + bool _is_live_stream = false; + bool _live_over_websocket = false; + //消耗的总流量 + uint64_t _total_bytes_usage = 0; string _origin; Parser _parser; Ticker _ticker; - //消耗的总流量 - uint64_t _ui64TotalBytes = 0; - //flv over http MediaInfo _mediaInfo; + TSMediaSource::RingType::RingReader::Ptr _ts_reader; + FMP4MediaSource::RingType::RingReader::Ptr _fmp4_reader; //处理content数据的callback function _contentCallBack; - bool _flv_over_websocket = false; - bool _is_flv_stream = false; }; diff --git a/src/Player/PlayerProxy.cpp b/src/Player/PlayerProxy.cpp index 83d4b5cb..e63b5a6a 100644 --- a/src/Player/PlayerProxy.cpp +++ b/src/Player/PlayerProxy.cpp @@ -47,13 +47,11 @@ static uint8_t s_mute_adts[] = {0xff, 0xf1, 0x6c, 0x40, 0x2d, 0x3f, 0xfc, 0x00, #define MUTE_ADTS_DATA_MS 130 PlayerProxy::PlayerProxy(const string &vhost, const string &app, const string &stream_id, - bool enable_rtsp, bool enable_rtmp, bool enable_hls, bool enable_mp4, - int retry_count, const EventPoller::Ptr &poller) : MediaPlayer(poller){ + bool enable_hls, bool enable_mp4, int retry_count, const EventPoller::Ptr &poller) + : MediaPlayer(poller) { _vhost = vhost; _app = app; _stream_id = stream_id; - _enable_rtsp = enable_rtsp; - _enable_rtmp = enable_rtmp; _enable_hls = enable_hls; _enable_mp4 = enable_mp4; _retry_count = retry_count; @@ -119,15 +117,12 @@ void PlayerProxy::play(const string &strUrlTmp) { if(dynamic_pointer_cast(_delegate)){ //rtsp拉流 GET_CONFIG(bool,directProxy,Rtsp::kDirectProxy); - if(directProxy && _enable_rtsp){ + if(directProxy){ mediaSource = std::make_shared(_vhost, _app, _stream_id); } } else if(dynamic_pointer_cast(_delegate)){ - //rtmp拉流 - if(_enable_rtmp){ - //rtmp强制直接代理 - mediaSource = std::make_shared(_vhost, _app, _stream_id); - } + //rtmp拉流,rtmp强制直接代理 + mediaSource = std::make_shared(_vhost, _app, _stream_id); } if(mediaSource){ setMediaSouce(mediaSource); @@ -224,17 +219,17 @@ void PlayerProxy::onPlaySuccess() { if (dynamic_pointer_cast(_pMediaSrc)) { //rtsp拉流代理 if (resetWhenRePlay || !_muxer) { - _muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), false, _enable_rtmp, _enable_hls, _enable_mp4)); + _muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), false, true, _enable_hls, _enable_mp4)); } } else if (dynamic_pointer_cast(_pMediaSrc)) { //rtmp拉流代理 if (resetWhenRePlay || !_muxer) { - _muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), _enable_rtsp, false, _enable_hls, _enable_mp4)); + _muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), true, false, _enable_hls, _enable_mp4)); } } else { //其他拉流代理 if (resetWhenRePlay || !_muxer) { - _muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), _enable_rtsp, _enable_rtmp, _enable_hls, _enable_mp4)); + _muxer.reset(new MultiMediaSourceMuxer(_vhost, _app, _stream_id, getDuration(), true, true, _enable_hls, _enable_mp4)); } } _muxer->setMediaListener(shared_from_this()); diff --git a/src/Player/PlayerProxy.h b/src/Player/PlayerProxy.h index 999786f4..35089ee5 100644 --- a/src/Player/PlayerProxy.h +++ b/src/Player/PlayerProxy.h @@ -27,7 +27,7 @@ public: //如果retry_count<0,则一直重试播放;否则重试retry_count次数 //默认一直重试 PlayerProxy(const string &vhost, const string &app, const string &stream_id, - bool enable_rtsp = true, bool enable_rtmp = true, bool enable_hls = true, bool enable_mp4 = false, + bool enable_hls = true, bool enable_mp4 = false, int retry_count = -1, const EventPoller::Ptr &poller = nullptr); ~PlayerProxy() override; @@ -63,8 +63,6 @@ private: void onPlaySuccess(); private: - bool _enable_rtsp; - bool _enable_rtmp; bool _enable_hls; bool _enable_mp4; int _retry_count; diff --git a/src/Record/HlsMaker.cpp b/src/Record/HlsMaker.cpp index 62a741c7..3d5e0e70 100644 --- a/src/Record/HlsMaker.cpp +++ b/src/Record/HlsMaker.cpp @@ -135,9 +135,10 @@ void HlsMaker::flushLastSegment(bool eof){ } _seg_dur_list.push_back(std::make_tuple(seg_dur, std::move(_last_file_name))); + _last_file_name.clear(); delOldSegment(); makeIndexFile(eof); - _last_file_name.clear(); + onFlushLastSegment(seg_dur); } bool HlsMaker::isLive() { diff --git a/src/Record/HlsMaker.h b/src/Record/HlsMaker.h index 04e2ea21..63a69767 100644 --- a/src/Record/HlsMaker.h +++ b/src/Record/HlsMaker.h @@ -79,14 +79,20 @@ protected: virtual void onWriteHls(const char *data, int len) = 0; /** - * 关闭上个ts切片并且写入m3u8索引 - * @param timestamp 毫秒时间戳 - * @param eof HLS直播是否已结束 + * 上一个 ts 切片写入完成, 可在这里进行通知处理 + * @param duration_ms 上一个 ts 切片的时长, 单位为毫秒 */ - void flushLastSegment(bool eof = false); + virtual void onFlushLastSegment(uint32_t duration_ms) {}; + virtual void onWriteRecordM3u8(const char *header, int hlen,const char *body,int blen) = 0; + /** + * 关闭上个ts切片并且写入m3u8索引 + * @param eof HLS直播是否已结束 + */ + void flushLastSegment(bool eof); + private: /** * 生成m3u8文件 diff --git a/src/Record/HlsMakerImp.cpp b/src/Record/HlsMakerImp.cpp index b0d1aa2f..22627b3e 100644 --- a/src/Record/HlsMakerImp.cpp +++ b/src/Record/HlsMakerImp.cpp @@ -8,6 +8,8 @@ * may be found in the AUTHORS file in the root of the source tree. */ +#include +#include #include "HlsMakerImp.h" #include "Util/util.h" #include "Util/uv_errno.h" @@ -27,7 +29,9 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file, _file_buf.reset(new char[bufSize],[](char *ptr){ delete[] ptr; }); + _ui64StartedTime = ::time(nullptr); + _info.folder = _path_prefix; } HlsMakerImp::~HlsMakerImp() { @@ -70,14 +74,22 @@ string HlsMakerImp::onOpenSegment(int index) { _segment_file_paths.emplace(index,segment_path); } } + _file = makeFile(segment_path, true); - if(!_file){ + + //保存本切片的元数据 + _info.start_time = ::time(NULL); + _info.file_name = segment_name; + _info.file_path = segment_path; + _info.url = _info.app + "/" + _info.stream + "/" + segment_name; + + if (!_file) { WarnL << "create file failed," << segment_path << " " << get_uv_errmsg(); } - if(_params.empty()){ - return std::move(segment_name); + if (_params.empty()) { + return segment_name; } - return std::move(segment_name + "?" + _params); + return segment_name + "?" + _params; } void HlsMakerImp::onDelSegment(int index) { @@ -122,7 +134,6 @@ std::shared_ptr HlsMakerImp::makeFile(const string &file,bool setbuf) { return ret; } - void HlsMakerImp::onWriteRecordM3u8(const char *header, int hlen,const char *body,int blen){ bool exist = true; string mode = "r+"; @@ -154,6 +165,18 @@ void HlsMakerImp::onWriteRecordM3u8(const char *header, int hlen,const char *bod DebugL << "_path_hls " << _path_hls; } +void HlsMakerImp::onFlushLastSegment(uint32_t duration_ms) { + GET_CONFIG(bool, broadcastRecordTs, Hls::kBroadcastRecordTs); + if (broadcastRecordTs) { + //关闭ts文件以便获取正确的文件大小 + _file = nullptr; + _info.time_len = duration_ms / 1000.0; + struct stat fileData; + stat(_info.file_path.data(), &fileData); + _info.file_size = fileData.st_size; + NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRecordTs, _info); + } +} std::shared_ptr HlsMakerImp::makeRecordM3u8(const string &file,const string &mode,bool setbuf) { auto file_buf = _file_buf; @@ -169,10 +192,11 @@ std::shared_ptr HlsMakerImp::makeRecordM3u8(const string &file,const strin return ret; } - - void HlsMakerImp::setMediaSource(const string &vhost, const string &app, const string &stream_id) { _media_src = std::make_shared(vhost, app, stream_id); + _info.app = app; + _info.stream = stream_id; + _info.vhost = vhost; } HlsMediaSource::Ptr HlsMakerImp::getMediaSource() const { diff --git a/src/Record/HlsMakerImp.h b/src/Record/HlsMakerImp.h index ca12b003..6f94a81a 100644 --- a/src/Record/HlsMakerImp.h +++ b/src/Record/HlsMakerImp.h @@ -16,6 +16,7 @@ #include #include "HlsMaker.h" #include "HlsMediaSource.h" + using namespace std; namespace mediakit { @@ -64,6 +65,7 @@ protected: void onWriteSegment(const char *data, int len) override; void onWriteHls(const char *data, int len) override; void onWriteRecordM3u8(const char *header, int hlen, const char *body, int blen) override; + void onFlushLastSegment(uint32_t duration_ms) override; private: std::shared_ptr makeFile(const string &file,bool setbuf = false); @@ -73,6 +75,7 @@ private: string _params; string _path_hls; string _path_prefix; + RecordInfo _info; std::shared_ptr _file; std::shared_ptr _file_buf; HlsMediaSource::Ptr _media_src; diff --git a/src/Record/MP4.cpp b/src/Record/MP4.cpp index 0dd1756a..684e4be3 100644 --- a/src/Record/MP4.cpp +++ b/src/Record/MP4.cpp @@ -13,34 +13,125 @@ #include "Util/File.h" #include "Util/logger.h" #include "Common/config.h" +#include "fmp4-writer.h" + using namespace toolkit; namespace mediakit { +/////////////////////////////////////////////////mp4_writer_t///////////////////////////////////////////////// + +struct mp4_writer_t { + int is_fmp4; + union { + fmp4_writer_t *fmp4; + mov_writer_t *mov; + } u; +}; + +mp4_writer_t* mp4_writer_create(int is_fmp4, const struct mov_buffer_t *buffer, void* param, int flags){ + mp4_writer_t *mp4 = (mp4_writer_t *) malloc(sizeof(mp4_writer_t)); + mp4->is_fmp4 = is_fmp4; + if (is_fmp4) { + mp4->u.fmp4 = fmp4_writer_create(buffer, param, flags); + } else { + mp4->u.mov = mov_writer_create(buffer, param, flags); + } + return mp4; +} + +void mp4_writer_destroy(mp4_writer_t* mp4){ + if (mp4->is_fmp4) { + fmp4_writer_destroy(mp4->u.fmp4); + } else { + mov_writer_destroy(mp4->u.mov); + } + free(mp4); +} + +int mp4_writer_add_audio(mp4_writer_t* mp4, uint8_t object, int channel_count, int bits_per_sample, int sample_rate, const void* extra_data, size_t extra_data_size){ + if (mp4->is_fmp4) { + return fmp4_writer_add_audio(mp4->u.fmp4, object, channel_count, bits_per_sample, sample_rate, extra_data, extra_data_size); + } else { + return mov_writer_add_audio(mp4->u.mov, object, channel_count, bits_per_sample, sample_rate, extra_data, extra_data_size); + } +} + +int mp4_writer_add_video(mp4_writer_t* mp4, uint8_t object, int width, int height, const void* extra_data, size_t extra_data_size){ + if (mp4->is_fmp4) { + return fmp4_writer_add_video(mp4->u.fmp4, object, width, height, extra_data, extra_data_size); + } else { + return mov_writer_add_video(mp4->u.mov, object, width, height, extra_data, extra_data_size); + } +} + +int mp4_writer_add_subtitle(mp4_writer_t* mp4, uint8_t object, const void* extra_data, size_t extra_data_size){ + if (mp4->is_fmp4) { + return fmp4_writer_add_subtitle(mp4->u.fmp4, object, extra_data, extra_data_size); + } else { + return mov_writer_add_subtitle(mp4->u.mov, object, extra_data, extra_data_size); + } +} + +int mp4_writer_write(mp4_writer_t* mp4, int track, const void* data, size_t bytes, int64_t pts, int64_t dts, int flags){ + if (mp4->is_fmp4) { + return fmp4_writer_write(mp4->u.fmp4, track, data, bytes, pts, dts, flags); + } else { + return mov_writer_write(mp4->u.mov, track, data, bytes, pts, dts, flags); + } +} + +int mp4_writer_write_l(mp4_writer_t* mp4, int track, const void* data, size_t bytes, int64_t pts, int64_t dts, int flags, int add_nalu_size){ + if (mp4->is_fmp4) { + return fmp4_writer_write_l(mp4->u.fmp4, track, data, bytes, pts, dts, flags, add_nalu_size); + } else { + return mov_writer_write_l(mp4->u.mov, track, data, bytes, pts, dts, flags, add_nalu_size); + } +} + +int mp4_writer_save_segment(mp4_writer_t* mp4){ + if (mp4->is_fmp4) { + return fmp4_writer_save_segment(mp4->u.fmp4); + } else { + return -1; + } +} + +int mp4_writer_init_segment(mp4_writer_t* mp4){ + if (mp4->is_fmp4) { + return fmp4_writer_init_segment(mp4->u.fmp4); + } else { + return -1; + } +} + +/////////////////////////////////////////////////MP4FileIO///////////////////////////////////////////////// + static struct mov_buffer_t s_io = { - [](void* ctx, void* data, uint64_t bytes) { - MP4File *thiz = (MP4File *)ctx; - return thiz->onRead(data,bytes); + [](void *ctx, void *data, uint64_t bytes) { + MP4FileIO *thiz = (MP4FileIO *) ctx; + return thiz->onRead(data, bytes); }, - [](void* ctx, const void* data, uint64_t bytes){ - MP4File *thiz = (MP4File *)ctx; - return thiz->onWrite(data,bytes); + [](void *ctx, const void *data, uint64_t bytes) { + MP4FileIO *thiz = (MP4FileIO *) ctx; + return thiz->onWrite(data, bytes); }, - [](void* ctx, uint64_t offset) { - MP4File *thiz = (MP4File *)ctx; + [](void *ctx, uint64_t offset) { + MP4FileIO *thiz = (MP4FileIO *) ctx; return thiz->onSeek(offset); }, - [](void* ctx){ - MP4File *thiz = (MP4File *)ctx; + [](void *ctx) { + MP4FileIO *thiz = (MP4FileIO *) ctx; return thiz->onTell(); } }; -MP4File::Writer MP4File::createWriter(){ - GET_CONFIG(bool, mp4FastStart, Record::kFastStart); +MP4FileIO::Writer MP4FileIO::createWriter(int flags, bool is_fmp4){ Writer writer; - writer.reset(mov_writer_create(&s_io,this,mp4FastStart ? MOV_FLAG_FASTSTART : 0),[](mov_writer_t *ptr){ + Ptr self = shared_from_this(); + //保存自己的强引用,防止提前释放 + writer.reset(mp4_writer_create(is_fmp4, &s_io,this, flags),[self](mp4_writer_t *ptr){ if(ptr){ - mov_writer_destroy(ptr); + mp4_writer_destroy(ptr); } }); if(!writer){ @@ -49,9 +140,11 @@ MP4File::Writer MP4File::createWriter(){ return writer; } -MP4File::Reader MP4File::createReader(){ +MP4FileIO::Reader MP4FileIO::createReader(){ Reader reader; - reader.reset(mov_reader_create(&s_io,this),[](mov_reader_t *ptr){ + Ptr self = shared_from_this(); + //保存自己的强引用,防止提前释放 + reader.reset(mov_reader_create(&s_io,this),[self](mov_reader_t *ptr){ if(ptr){ mov_reader_destroy(ptr); } @@ -62,15 +155,17 @@ MP4File::Reader MP4File::createReader(){ return reader; } +/////////////////////////////////////////////////////MP4FileDisk///////////////////////////////////////////////////////// + #if defined(_WIN32) || defined(_WIN64) #define fseek64 _fseeki64 -#define ftell64 _ftelli64 + #define ftell64 _ftelli64 #else -#define fseek64 fseek -#define ftell64 ftell + #define fseek64 fseek + #define ftell64 ftell #endif -void MP4File::openFile(const char *file,const char *mode) { +void MP4FileDisk::openFile(const char *file, const char *mode) { //创建文件 auto fp = File::create_file(file, mode); if(!fp){ @@ -98,28 +193,74 @@ void MP4File::openFile(const char *file,const char *mode) { }); } -void MP4File::closeFile() { +void MP4FileDisk::closeFile() { _file = nullptr; } -int MP4File::onRead(void *data, uint64_t bytes) { +int MP4FileDisk::onRead(void *data, uint64_t bytes) { if (bytes == fread(data, 1, bytes, _file.get())){ return 0; } return 0 != ferror(_file.get()) ? ferror(_file.get()) : -1 /*EOF*/; } -int MP4File::onWrite(const void *data, uint64_t bytes) { +int MP4FileDisk::onWrite(const void *data, uint64_t bytes) { return bytes == fwrite(data, 1, bytes, _file.get()) ? 0 : ferror(_file.get()); } -int MP4File::onSeek(uint64_t offset) { +int MP4FileDisk::onSeek(uint64_t offset) { return fseek64(_file.get(), offset, SEEK_SET); } -uint64_t MP4File::onTell() { +uint64_t MP4FileDisk::onTell() { return ftell64(_file.get()); } +/////////////////////////////////////////////////////MP4FileMemory///////////////////////////////////////////////////////// + +string MP4FileMemory::getAndClearMemory(){ + string ret; + ret.swap(_memory); + _offset = 0; + return ret; +} + +uint64_t MP4FileMemory::fileSize() const{ + return _memory.size(); +} + +uint64_t MP4FileMemory::onTell(){ + return _offset; +} + +int MP4FileMemory::onSeek(uint64_t offset){ + if (offset > _memory.size()) { + return -1; + } + _offset = offset; + return 0; +} + +int MP4FileMemory::onRead(void *data, uint64_t bytes){ + if (_offset >= _memory.size()) { + //EOF + return -1; + } + bytes = MIN(bytes, _memory.size() - _offset); + memcpy(data, _memory.data(), bytes); + _offset += bytes; + return 0; +} + +int MP4FileMemory::onWrite(const void *data, uint64_t bytes){ + if (_offset + bytes > _memory.size()) { + //需要扩容 + _memory.resize(_offset + bytes); + } + memcpy((uint8_t *) _memory.data() + _offset, data, bytes); + _offset += bytes; + return 0; +} + }//namespace mediakit #endif //NABLE_MP4RECORD diff --git a/src/Record/MP4.h b/src/Record/MP4.h index b76362a9..3b7ae1c3 100644 --- a/src/Record/MP4.h +++ b/src/Record/MP4.h @@ -23,27 +23,127 @@ using namespace std; namespace mediakit { -class MP4File { -public: - friend struct mov_buffer_t; - typedef std::shared_ptr Writer; - typedef std::shared_ptr Reader; - MP4File() = default; - virtual ~MP4File() = default; +//以下是fmp4/mov的通用接口,简单包装了ireader/media-server的接口 +typedef struct mp4_writer_t mp4_writer_t; +mp4_writer_t* mp4_writer_create(int is_fmp4, const struct mov_buffer_t *buffer, void* param, int flags); +void mp4_writer_destroy(mp4_writer_t* mp4); +int mp4_writer_add_audio(mp4_writer_t* mp4, uint8_t object, int channel_count, int bits_per_sample, int sample_rate, const void* extra_data, size_t extra_data_size); +int mp4_writer_add_video(mp4_writer_t* mp4, uint8_t object, int width, int height, const void* extra_data, size_t extra_data_size); +int mp4_writer_add_subtitle(mp4_writer_t* mp4, uint8_t object, const void* extra_data, size_t extra_data_size); +int mp4_writer_write(mp4_writer_t* mp4, int track, const void* data, size_t bytes, int64_t pts, int64_t dts, int flags); +int mp4_writer_write_l(mp4_writer_t* mp4, int track, const void* data, size_t bytes, int64_t pts, int64_t dts, int flags, int add_nalu_size); +int mp4_writer_save_segment(mp4_writer_t* mp4); +int mp4_writer_init_segment(mp4_writer_t* mp4); - Writer createWriter(); - Reader createReader(); - void openFile(const char *file,const char *mode); +//mp4文件IO的抽象接口类 +class MP4FileIO : public std::enable_shared_from_this { +public: + using Ptr = std::shared_ptr; + using Writer = std::shared_ptr; + using Reader = std::shared_ptr; + + MP4FileIO() = default; + virtual ~MP4FileIO() = default; + + /** + * 创建mp4复用器 + * @param flags 支持0、MOV_FLAG_FASTSTART、MOV_FLAG_SEGMENT + * @param is_fmp4 是否为fmp4还是普通mp4 + * @return mp4复用器 + */ + virtual Writer createWriter(int flags, bool is_fmp4 = false); + + /** + * 创建mp4解复用器 + * @return mp4解复用器 + */ + virtual Reader createReader(); + + /** + * 获取文件读写位置 + */ + virtual uint64_t onTell() = 0; + + /** + * seek至文件某处 + * @param offset 文件偏移量 + * @return 是否成功(0成功) + */ + virtual int onSeek(uint64_t offset) = 0; + + /** + * 从文件读取一定数据 + * @param data 数据存放指针 + * @param bytes 指针长度 + * @return 是否成功(0成功) + */ + virtual int onRead(void *data, uint64_t bytes) = 0; + + /** + * 写入文件一定数据 + * @param data 数据指针 + * @param bytes 数据长度 + * @return 是否成功(0成功) + */ + virtual int onWrite(const void *data, uint64_t bytes) = 0; +}; + +//磁盘MP4文件类 +class MP4FileDisk : public MP4FileIO { +public: + using Ptr = std::shared_ptr; + MP4FileDisk() = default; + ~MP4FileDisk() override = default; + + /** + * 打开磁盘文件 + * @param file 文件路径 + * @param mode fopen的方式 + */ + void openFile(const char *file, const char *mode); + + /** + * 关闭磁盘文件 + */ void closeFile(); - int onRead(void* data, uint64_t bytes); - int onWrite(const void* data, uint64_t bytes); - int onSeek( uint64_t offset); - uint64_t onTell(); +protected: + uint64_t onTell() override; + int onSeek(uint64_t offset) override; + int onRead(void *data, uint64_t bytes) override; + int onWrite(const void *data, uint64_t bytes) override; + private: std::shared_ptr _file; }; +class MP4FileMemory : public MP4FileIO{ +public: + using Ptr = std::shared_ptr; + MP4FileMemory() = default; + ~MP4FileMemory() override = default; + + /** + * 获取文件大小 + */ + uint64_t fileSize() const; + + /** + * 获取并清空文件缓存 + */ + string getAndClearMemory(); + +protected: + uint64_t onTell() override; + int onSeek(uint64_t offset) override; + int onRead(void *data, uint64_t bytes) override; + int onWrite(const void *data, uint64_t bytes) override; + +private: + uint64_t _offset = 0; + string _memory; +}; + }//namespace mediakit #endif //NABLE_MP4RECORD #endif //ZLMEDIAKIT_MP4_H diff --git a/src/Record/MP4Demuxer.cpp b/src/Record/MP4Demuxer.cpp index d77e1734..65c4263d 100644 --- a/src/Record/MP4Demuxer.cpp +++ b/src/Record/MP4Demuxer.cpp @@ -19,18 +19,20 @@ using namespace toolkit; namespace mediakit { -MP4Demuxer::MP4Demuxer(const char *file) { - openFile(file,"rb+"); - _mov_reader = createReader(); - getAllTracks(); - _duration_ms = mov_reader_getduration(_mov_reader.get()); -} +MP4Demuxer::MP4Demuxer() {} MP4Demuxer::~MP4Demuxer() { _mov_reader = nullptr; closeFile(); } +void MP4Demuxer::openMP4(const string &file){ + openFile(file.data(),"rb+"); + _mov_reader = createReader(); + getAllTracks(); + _duration_ms = mov_reader_getduration(_mov_reader.get()); +} + int MP4Demuxer::getAllTracks() { static mov_reader_trackinfo_t s_on_track = { [](void *param, uint32_t track, uint8_t object, int width, int height, const void *extra, size_t bytes) { @@ -158,6 +160,8 @@ struct Context{ BufferRaw::Ptr buffer; }; +#define DATA_OFFSET ADTS_HEADER_LEN + Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) { keyFrame = false; eof = false; @@ -172,9 +176,9 @@ Frame::Ptr MP4Demuxer::readFrame(bool &keyFrame, bool &eof) { static mov_onalloc mov_onalloc = [](void *param, int bytes) -> void * { Context *ctx = (Context *) param; ctx->buffer = ctx->thiz->_buffer_pool.obtain(); - ctx->buffer->setCapacity(bytes + 1); - ctx->buffer->setSize(bytes); - return ctx->buffer->data(); + ctx->buffer->setCapacity(bytes + DATA_OFFSET + 1); + ctx->buffer->setSize(bytes + DATA_OFFSET); + return ctx->buffer->data() + DATA_OFFSET; }; Context ctx = {this, 0}; @@ -202,11 +206,11 @@ template class FrameWrapper : public Parent{ public: ~FrameWrapper() = default; - FrameWrapper(const Buffer::Ptr &buf, int64_t pts, int64_t dts, int prefix) : Parent(buf->data(), buf->size(), dts, pts, prefix){ + FrameWrapper(const Buffer::Ptr &buf, int64_t pts, int64_t dts, int prefix, int offset) : Parent(buf->data() + offset, buf->size() - offset, dts, pts, prefix){ _buf = buf; } - FrameWrapper(CodecId codec,const Buffer::Ptr &buf, int64_t pts, int64_t dts, int prefix) : Parent(codec, buf->data(), buf->size(), dts, pts, prefix){ + FrameWrapper(const Buffer::Ptr &buf, int64_t pts, int64_t dts, int prefix, int offset, CodecId codec) : Parent(codec, buf->data() + offset, buf->size() - offset, dts, pts, prefix){ _buf = buf; } @@ -222,37 +226,44 @@ Frame::Ptr MP4Demuxer::makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int6 if (it == _track_to_codec.end()) { return nullptr; } - auto numBytes = buf->size(); - auto pBytes = buf->data(); + auto bytes = buf->size() - DATA_OFFSET; + auto data = buf->data() + DATA_OFFSET; auto codec = it->second->getCodecId(); switch (codec) { case CodecH264 : case CodecH265 : { - uint32_t iOffset = 0; - while (iOffset < numBytes) { - uint32_t iFrameLen; - memcpy(&iFrameLen, pBytes + iOffset, 4); - iFrameLen = ntohl(iFrameLen); - if (iFrameLen + iOffset + 4 > numBytes) { + uint32_t offset = 0; + while (offset < bytes) { + uint32_t frame_len; + memcpy(&frame_len, data + offset, 4); + frame_len = ntohl(frame_len); + if (frame_len + offset + 4 > bytes) { return nullptr; } - memcpy(pBytes + iOffset, "\x0\x0\x0\x1", 4); - iOffset += (iFrameLen + 4); + memcpy(data + offset, "\x0\x0\x0\x1", 4); + offset += (frame_len + 4); } if (codec == CodecH264) { - return std::make_shared >(buf, pts, dts, 4); + return std::make_shared >(buf, pts, dts, 4, DATA_OFFSET); } - return std::make_shared >(buf, pts, dts, 4); + return std::make_shared >(buf, pts, dts, 4, DATA_OFFSET); + } + + case CodecAAC: { + AACTrack::Ptr track = dynamic_pointer_cast(it->second); + assert(track); + //加上adts头 + dumpAacConfig(track->getAacCfg(), buf->size() - DATA_OFFSET, (uint8_t *) buf->data() + (DATA_OFFSET - ADTS_HEADER_LEN), ADTS_HEADER_LEN); + return std::make_shared >(buf, pts, dts, ADTS_HEADER_LEN, DATA_OFFSET - ADTS_HEADER_LEN, codec); } case CodecOpus: - case CodecAAC: case CodecG711A: case CodecG711U: { - return std::make_shared >(codec, buf, pts, dts, 0); + return std::make_shared >(buf, pts, dts, 0, DATA_OFFSET, codec); } - default: - return nullptr; + + default: return nullptr; } } diff --git a/src/Record/MP4Demuxer.h b/src/Record/MP4Demuxer.h index 5565cc22..2512602c 100644 --- a/src/Record/MP4Demuxer.h +++ b/src/Record/MP4Demuxer.h @@ -16,24 +16,60 @@ #include "Util/ResourcePool.h" namespace mediakit { -class MP4Demuxer : public MP4File, public TrackSource{ +class MP4Demuxer : public MP4FileDisk, public TrackSource{ public: typedef std::shared_ptr Ptr; - MP4Demuxer(const char *file); + + /** + * 创建mp4解复用器 + */ + MP4Demuxer(); ~MP4Demuxer() override; + + /** + * 打开文件 + * @param file mp4文件路径 + */ + void openMP4(const string &file); + + /** + * 移动时间轴至某处 + * @param stamp_ms 预期的时间轴位置,单位毫秒 + * @return 时间轴位置 + */ int64_t seekTo(int64_t stamp_ms); + + /** + * 读取一帧数据 + * @param keyFrame 是否为关键帧 + * @param eof 是否文件读取完毕 + * @return 帧数据,可能为空 + */ Frame::Ptr readFrame(bool &keyFrame, bool &eof); - vector getTracks(bool trackReady) const override ; + + /** + * 获取所有Track信息 + * @param trackReady 是否要求track为就绪状态 + * @return 所有Track + */ + vector getTracks(bool trackReady) const override; + + /** + * 获取文件长度 + * @return 文件长度,单位毫秒 + */ uint64_t getDurationMS() const; + private: int getAllTracks(); - void onVideoTrack(uint32_t track_id, uint8_t object, int width, int height, const void* extra, size_t bytes); - void onAudioTrack(uint32_t track_id, uint8_t object, int channel_count, int bit_per_sample, int sample_rate, const void* extra, size_t bytes); - Frame::Ptr makeFrame(uint32_t track_id, const Buffer::Ptr &buf,int64_t pts, int64_t dts); + void onVideoTrack(uint32_t track_id, uint8_t object, int width, int height, const void *extra, size_t bytes); + void onAudioTrack(uint32_t track_id, uint8_t object, int channel_count, int bit_per_sample, int sample_rate, const void *extra, size_t bytes); + Frame::Ptr makeFrame(uint32_t track_id, const Buffer::Ptr &buf, int64_t pts, int64_t dts); + private: - MP4File::Reader _mov_reader; + Reader _mov_reader; uint64_t _duration_ms = 0; - map _track_to_codec; + map _track_to_codec; ResourcePool _buffer_pool; }; diff --git a/src/Record/MP4Muxer.cpp b/src/Record/MP4Muxer.cpp index 72fbbc9d..471b6e09 100644 --- a/src/Record/MP4Muxer.cpp +++ b/src/Record/MP4Muxer.cpp @@ -14,33 +14,57 @@ #include "Extension/H264.h" namespace mediakit{ -MP4Muxer::MP4Muxer(const char *file) { - _file_name = file; - openMP4(); -} +MP4Muxer::MP4Muxer() {} MP4Muxer::~MP4Muxer() { closeMP4(); } -void MP4Muxer::openMP4(){ +void MP4Muxer::openMP4(const string &file){ closeMP4(); - openFile(_file_name.data(), "wb+"); - _mov_writter = createWriter(); + _file_name = file; + _mp4_file = std::make_shared(); + _mp4_file->openFile(_file_name.data(), "wb+"); } + +MP4FileIO::Writer MP4Muxer::createWriter(){ + GET_CONFIG(bool, mp4FastStart, Record::kFastStart); + return _mp4_file->createWriter(mp4FastStart ? MOV_FLAG_FASTSTART : 0, false); +} + void MP4Muxer::closeMP4(){ - _mov_writter = nullptr; - closeFile(); + MP4MuxerInterface::resetTracks(); + _mp4_file = nullptr; } void MP4Muxer::resetTracks() { - _codec_to_trackid.clear(); - _started = false; - _have_video = false; - openMP4(); + MP4MuxerInterface::resetTracks(); + openMP4(_file_name); } -void MP4Muxer::inputFrame(const Frame::Ptr &frame) { +/////////////////////////////////////////// MP4MuxerInterface ///////////////////////////////////////////// + +void MP4MuxerInterface::saveSegment(){ + mp4_writer_save_segment(_mov_writter.get()); +} + +void MP4MuxerInterface::initSegment(){ + mp4_writer_init_segment(_mov_writter.get()); +} + +bool MP4MuxerInterface::haveVideo() const{ + return _have_video; +} + +void MP4MuxerInterface::resetTracks() { + _started = false; + _have_video = false; + _mov_writter = nullptr; + _frameCached.clear(); + _codec_to_trackid.clear(); +} + +void MP4MuxerInterface::inputFrame(const Frame::Ptr &frame) { auto it = _codec_to_trackid.find(frame->getCodecId()); if(it == _codec_to_trackid.end()){ //该Track不存在或初始化失败 @@ -84,24 +108,23 @@ void MP4Muxer::inputFrame(const Frame::Ptr &frame) { merged.append((char *) &nalu_size, 4); merged.append(frame->data() + frame->prefixSize(), frame->size() - frame->prefixSize()); }); - mov_writer_write_l(_mov_writter.get(), + mp4_writer_write(_mov_writter.get(), track_info.track_id, merged.data(), merged.size(), pts_out, dts_out, - back->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0, - 1/*我们合并时已经生成了4个字节的MP4格式start code*/); + back->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0); } else { //缓存中只有一帧视频 - mov_writer_write_l(_mov_writter.get(), + mp4_writer_write_l(_mov_writter.get(), track_info.track_id, back->data() + back->prefixSize(), back->size() - back->prefixSize(), pts_out, dts_out, back->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0, - 0/*需要生成头4个字节的MP4格式start code*/); + 1/*需要生成头4个字节的MP4格式start code*/); } _frameCached.clear(); } @@ -111,14 +134,13 @@ void MP4Muxer::inputFrame(const Frame::Ptr &frame) { break; default: { track_info.stamp.revise(frame->dts(), frame->pts(), dts_out, pts_out); - mov_writer_write_l(_mov_writter.get(), - track_info.track_id, - frame->data() + frame->prefixSize(), - frame->size() - frame->prefixSize(), - pts_out, - dts_out, - frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0, - 1/*aac或其他类型frame不用添加4个nalu_size的字节*/); + mp4_writer_write(_mov_writter.get(), + track_info.track_id, + frame->data() + frame->prefixSize(), + frame->size() - frame->prefixSize(), + pts_out, + dts_out, + frame->keyFrame() ? MOV_AV_FLAG_KEYFREAME : 0); } break; } @@ -136,7 +158,7 @@ static uint8_t getObject(CodecId codecId){ } } -void MP4Muxer::stampSync(){ +void MP4MuxerInterface::stampSync(){ if(_codec_to_trackid.size() < 2){ return; } @@ -156,7 +178,10 @@ void MP4Muxer::stampSync(){ } } -void MP4Muxer::addTrack(const Track::Ptr &track) { +void MP4MuxerInterface::addTrack(const Track::Ptr &track) { + if (!_mov_writter) { + _mov_writter = createWriter(); + } auto mp4_object = getObject(track->getCodecId()); if (!mp4_object) { WarnL << "MP4录制不支持该编码格式:" << track->getCodecName(); @@ -178,7 +203,7 @@ void MP4Muxer::addTrack(const Track::Ptr &track) { return; } - auto track_id = mov_writer_add_audio(_mov_writter.get(), + auto track_id = mp4_writer_add_audio(_mov_writter.get(), mp4_object, audio_track->getAudioChannel(), audio_track->getAudioSampleBit() * audio_track->getAudioChannel(), @@ -199,7 +224,7 @@ void MP4Muxer::addTrack(const Track::Ptr &track) { return; } - auto track_id = mov_writer_add_audio(_mov_writter.get(), + auto track_id = mp4_writer_add_audio(_mov_writter.get(), mp4_object, audio_track->getAudioChannel(), audio_track->getAudioSampleBit() * audio_track->getAudioChannel(), @@ -232,7 +257,7 @@ void MP4Muxer::addTrack(const Track::Ptr &track) { return; } - auto track_id = mov_writer_add_video(_mov_writter.get(), + auto track_id = mp4_writer_add_video(_mov_writter.get(), mp4_object, h264_track->getVideoWidth(), h264_track->getVideoHeight(), @@ -267,7 +292,7 @@ void MP4Muxer::addTrack(const Track::Ptr &track) { return; } - auto track_id = mov_writer_add_video(_mov_writter.get(), + auto track_id = mp4_writer_add_video(_mov_writter.get(), mp4_object, h265_track->getVideoWidth(), h265_track->getVideoHeight(), @@ -289,5 +314,54 @@ void MP4Muxer::addTrack(const Track::Ptr &track) { stampSync(); } +/////////////////////////////////////////// MP4MuxerMemory ///////////////////////////////////////////// + +MP4MuxerMemory::MP4MuxerMemory() { + _memory_file = std::make_shared(); +} + +MP4FileIO::Writer MP4MuxerMemory::createWriter() { + return _memory_file->createWriter(MOV_FLAG_SEGMENT, true); +} + +const string &MP4MuxerMemory::getInitSegment(){ + if (_init_segment.empty()) { + initSegment(); + saveSegment(); + _init_segment = _memory_file->getAndClearMemory(); + } + return _init_segment; +} + +void MP4MuxerMemory::resetTracks(){ + MP4MuxerInterface::resetTracks(); + _memory_file = std::make_shared(); + _init_segment.clear(); +} + +void MP4MuxerMemory::inputFrame(const Frame::Ptr &frame){ + if (_init_segment.empty()) { + //尚未生成init segment + return; + } + + bool key_frame = frame->keyFrame(); + if (_ticker.elapsedTime() > 50 || key_frame) { + //遇到关键帧或者超过50ms则切片 + _ticker.resetTime(); + //flush切片 + saveSegment(); + //输出切片数据 + onSegmentData(_memory_file->getAndClearMemory(), frame->dts(), _key_frame); + _key_frame = false; + } + + if (key_frame) { + _key_frame = true; + } + MP4MuxerInterface::inputFrame(frame); +} + + }//namespace mediakit #endif//#ifdef ENABLE_MP4 diff --git a/src/Record/MP4Muxer.h b/src/Record/MP4Muxer.h index f2e3fbd2..479e98d5 100644 --- a/src/Record/MP4Muxer.h +++ b/src/Record/MP4Muxer.h @@ -23,17 +23,16 @@ namespace mediakit{ -class MP4Muxer : public MediaSinkInterface, public MP4File{ +class MP4MuxerInterface : public MediaSinkInterface { public: - typedef std::shared_ptr Ptr; - - MP4Muxer(const char *file); - ~MP4Muxer() override; + MP4MuxerInterface() = default; + ~MP4MuxerInterface() override = default; /** * 添加已经ready状态的track */ - void addTrack(const Track::Ptr & track) override; + void addTrack(const Track::Ptr &track) override; + /** * 输入帧 */ @@ -42,30 +41,112 @@ public: /** * 重置所有track */ - void resetTracks() override ; + void resetTracks() override; + + /** + * 是否包含视频 + */ + bool haveVideo() const; + + /** + * 保存fmp4分片 + */ + void saveSegment(); + + /** + * 创建新切片 + */ + void initSegment(); + +protected: + virtual MP4FileIO::Writer createWriter() = 0; + +private: + void stampSync(); + +private: + bool _started = false; + bool _have_video = false; + MP4FileIO::Writer _mov_writter; + struct track_info { + int track_id = -1; + Stamp stamp; + }; + List _frameCached; + unordered_map _codec_to_trackid; +}; + +class MP4Muxer : public MP4MuxerInterface{ +public: + typedef std::shared_ptr Ptr; + + MP4Muxer(); + ~MP4Muxer() override; + + /** + * 重置所有track + */ + void resetTracks() override; + + /** + * 打开mp4 + * @param file 文件完整路径 + */ + void openMP4(const string &file); /** * 手动关闭文件(对象析构时会自动关闭) */ void closeMP4(); -private: - void openMP4(); - void stampSync(); +protected: + MP4FileIO::Writer createWriter() override; private: - struct track_info { - int track_id = -1; - Stamp stamp; - }; - unordered_map _codec_to_trackid; - List _frameCached; - bool _started = false; - bool _have_video = false; - MP4File::Writer _mov_writter; string _file_name; + MP4FileDisk::Ptr _mp4_file; }; +class MP4MuxerMemory : public MP4MuxerInterface{ +public: + MP4MuxerMemory(); + ~MP4MuxerMemory() override = default; + + /** + * 重置所有track + */ + void resetTracks() override; + + /** + * 输入帧 + */ + void inputFrame(const Frame::Ptr &frame) override; + + /** + * 获取fmp4 init segment + */ + const string &getInitSegment(); + +protected: + /** + * 输出fmp4切片回调函数 + * @param string 切片内容 + * @param stamp 切片末尾时间戳 + * @param key_frame 是否有关键帧 + */ + virtual void onSegmentData(const string &string, uint32_t stamp, bool key_frame) = 0; + +protected: + MP4FileIO::Writer createWriter() override; + +private: + bool _key_frame = false; + Ticker _ticker; + string _init_segment; + MP4FileMemory::Ptr _memory_file; +}; + + }//namespace mediakit #endif//#ifdef ENABLE_MP4 #endif //ZLMEDIAKIT_MP4MUXER_H diff --git a/src/Record/MP4Reader.cpp b/src/Record/MP4Reader.cpp index 1e78c617..ea58e0a3 100644 --- a/src/Record/MP4Reader.cpp +++ b/src/Record/MP4Reader.cpp @@ -29,7 +29,8 @@ MP4Reader::MP4Reader(const string &strVhost,const string &strApp, const string & strFileName = File::absolutePath(strFileName,recordPath); } - _demuxer = std::make_shared(strFileName.data()); + _demuxer = std::make_shared(); + _demuxer->openMP4(strFileName); _mediaMuxer.reset(new MultiMediaSourceMuxer(strVhost, strApp, strId, _demuxer->getDurationMS() / 1000.0, true, true, false, false)); auto tracks = _demuxer->getTracks(false); if(tracks.empty()){ diff --git a/src/Record/MP4Reader.h b/src/Record/MP4Reader.h index bb7e365e..b8348752 100644 --- a/src/Record/MP4Reader.h +++ b/src/Record/MP4Reader.h @@ -35,6 +35,7 @@ public: * 意思是在文件流化结束之前或中断之前,MP4Reader对象是不会被销毁的(不管有没有被外部对象持有) */ void startReadMP4(); + private: //MediaSourceEvent override bool seekTo(MediaSource &sender,uint32_t ui32Stamp) override; @@ -45,15 +46,16 @@ private: uint32_t getCurrentStamp(); void setCurrentStamp(uint32_t ui32Stamp); bool seekTo(uint32_t ui32Stamp); + private: - recursive_mutex _mtx; - MultiMediaSourceMuxer::Ptr _mediaMuxer; + bool _have_video = false; uint32_t _seek_to; + recursive_mutex _mtx; Ticker _seek_ticker; Timer::Ptr _timer; EventPoller::Ptr _poller; MP4Demuxer::Ptr _demuxer; - bool _have_video = false; + MultiMediaSourceMuxer::Ptr _mediaMuxer; }; } /* namespace mediakit */ diff --git a/src/Record/MP4Recorder.cpp b/src/Record/MP4Recorder.cpp index 1f247354..b9922b17 100644 --- a/src/Record/MP4Recorder.cpp +++ b/src/Record/MP4Recorder.cpp @@ -25,10 +25,10 @@ MP4Recorder::MP4Recorder(const string& strPath, const string &strStreamId) { _strPath = strPath; /////record 业务逻辑////// - _info.strAppName = strApp; - _info.strStreamId = strStreamId; - _info.strVhost = strVhost; - _info.strFolder = strPath; + _info.app = strApp; + _info.stream = strStreamId; + _info.vhost = strVhost; + _info.folder = strPath; } MP4Recorder::~MP4Recorder() { closeFile(); @@ -42,26 +42,27 @@ void MP4Recorder::createFile() { auto strFile = _strPath + strDate + "/" + strTime + ".mp4"; /////record 业务逻辑////// - _info.ui64StartedTime = ::time(NULL); - _info.strFileName = strTime + ".mp4"; - _info.strFilePath = strFile; + _info.start_time = ::time(NULL); + _info.file_name = strTime + ".mp4"; + _info.file_path = strFile; GET_CONFIG(string,appName,Record::kAppName); - _info.strUrl = appName + "/" - + _info.strAppName + "/" - + _info.strStreamId + "/" - + strDate + "/" - + strTime + ".mp4"; + _info.url = appName + "/" + + _info.app + "/" + + _info.stream + "/" + + strDate + "/" + + strTime + ".mp4"; try { - _muxer = std::make_shared(strFileTmp.data()); - for(auto &track :_tracks){ + _muxer = std::make_shared(); + _muxer->openMP4(strFileTmp); + for (auto &track :_tracks) { //添加track _muxer->addTrack(track); } _strFileTmp = strFileTmp; _strFile = strFile; _createFileTicker.resetTime(); - }catch(std::exception &ex) { + } catch (std::exception &ex) { WarnL << ex.what(); } } @@ -73,7 +74,7 @@ void MP4Recorder::asyncClose() { auto info = _info; WorkThreadPool::Instance().getExecutor()->async([muxer,strFileTmp,strFile,info]() { //获取文件录制时间,放在关闭mp4之前是为了忽略关闭mp4执行时间 - const_cast(info).ui64TimeLen = ::time(NULL) - info.ui64StartedTime; + const_cast(info).time_len = ::time(NULL) - info.start_time; //关闭mp4非常耗时,所以要放在后台线程执行 muxer->closeMP4(); //临时文件名改成正式文件名,防止mp4未完成时被访问 @@ -81,7 +82,7 @@ void MP4Recorder::asyncClose() { //获取文件大小 struct stat fileData; stat(strFile.data(), &fileData); - const_cast(info).ui64FileSize = fileData.st_size; + const_cast(info).file_size = fileData.st_size; /////record 业务逻辑////// NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastRecordMP4,info); }); diff --git a/src/Record/MP4Recorder.h b/src/Record/MP4Recorder.h index 60f3c561..0270ea62 100644 --- a/src/Record/MP4Recorder.h +++ b/src/Record/MP4Recorder.h @@ -20,24 +20,11 @@ #include "Util/TimeTicker.h" #include "Common/MediaSink.h" #include "MP4Muxer.h" + using namespace toolkit; namespace mediakit { -class MP4Info { -public: - time_t ui64StartedTime; //GMT标准时间,单位秒 - time_t ui64TimeLen;//录像长度,单位秒 - off_t ui64FileSize;//文件大小,单位BYTE - string strFilePath;//文件路径 - string strFileName;//文件名称 - string strFolder;//文件夹路径 - string strUrl;//播放路径 - string strAppName;//应用名称 - string strStreamId;//流ID - string strVhost;//vhost -}; - #ifdef ENABLE_MP4 class MP4Recorder : public MediaSinkInterface{ public: @@ -72,7 +59,7 @@ private: string _strFile; string _strFileTmp; Ticker _createFileTicker; - MP4Info _info; + RecordInfo _info; bool _haveVideo = false; MP4Muxer::Ptr _muxer; list _tracks; diff --git a/src/Record/Recorder.h b/src/Record/Recorder.h index c9ff78f8..b057c728 100644 --- a/src/Record/Recorder.h +++ b/src/Record/Recorder.h @@ -16,6 +16,20 @@ using namespace std; namespace mediakit { class MediaSinkInterface; +class RecordInfo { +public: + time_t start_time; // GMT 标准时间,单位秒 + float time_len; // 录像长度,单位秒 + off_t file_size; // 文件大小,单位 BYTE + string file_path; // 文件路径 + string file_name; // 文件名称 + string folder; // 文件夹路径 + string url; // 播放路径 + string app; // 应用名称 + string stream; // 流 ID + string vhost; // 虚拟主机 +}; + class Recorder{ public: typedef enum { diff --git a/src/Rtp/RtpSelector.cpp b/src/Rtp/RtpSelector.cpp index 8f74e8ce..283dd98f 100644 --- a/src/Rtp/RtpSelector.cpp +++ b/src/Rtp/RtpSelector.cpp @@ -72,31 +72,40 @@ void RtpSelector::createTimer() { } void RtpSelector::delProcess(const string &stream_id,const RtpProcess *ptr) { - lock_guard lck(_mtx_map); - auto it = _map_rtp_process.find(stream_id); - if (it == _map_rtp_process.end()) { - return; + RtpProcess::Ptr process; + { + lock_guard lck(_mtx_map); + auto it = _map_rtp_process.find(stream_id); + if (it == _map_rtp_process.end()) { + return; + } + if (it->second->getProcess().get() != ptr) { + return; + } + process = it->second->getProcess(); + _map_rtp_process.erase(it); } - if (it->second->getProcess().get() != ptr) { - return; - } - auto process = it->second->getProcess(); - _map_rtp_process.erase(it); process->onDetach(); } void RtpSelector::onManager() { - lock_guard lck(_mtx_map); - for (auto it = _map_rtp_process.begin(); it != _map_rtp_process.end();) { - if (it->second->getProcess()->alive()) { - ++it; - continue; + List clear_list; + { + lock_guard lck(_mtx_map); + for (auto it = _map_rtp_process.begin(); it != _map_rtp_process.end();) { + if (it->second->getProcess()->alive()) { + ++it; + continue; + } + WarnL << "RtpProcess timeout:" << it->first; + clear_list.emplace_back(it->second->getProcess()); + it = _map_rtp_process.erase(it); } - WarnL << "RtpProcess timeout:" << it->first; - auto process = it->second->getProcess(); - it = _map_rtp_process.erase(it); - process->onDetach(); } + + clear_list.for_each([](const RtpProcess::Ptr &process) { + process->onDetach(); + }); } RtpSelector::RtpSelector() { diff --git a/src/TS/TSMediaSource.h b/src/TS/TSMediaSource.h new file mode 100644 index 00000000..c7b05f53 --- /dev/null +++ b/src/TS/TSMediaSource.h @@ -0,0 +1,129 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_TSMEDIASOURCE_H +#define ZLMEDIAKIT_TSMEDIASOURCE_H + +#include "Common/MediaSource.h" +using namespace toolkit; +#define TS_GOP_SIZE 512 + +namespace mediakit { + +//TS直播数据包 +class TSPacket : public BufferRaw{ +public: + using Ptr = std::shared_ptr; + + template + TSPacket(ARGS && ...args) : BufferRaw(std::forward(args)...) {}; + ~TSPacket() override = default; + +public: + uint32_t time_stamp = 0; +}; + +//TS直播合并写策略类 +class TSFlushPolicy : public FlushPolicy{ +public: + TSFlushPolicy() = default; + ~TSFlushPolicy() = default; + + uint32_t getStamp(const TSPacket::Ptr &packet) { + return packet->time_stamp; + } +}; + +//TS直播源 +class TSMediaSource : public MediaSource, public RingDelegate, public PacketCache{ +public: + using PoolType = ResourcePool; + using Ptr = std::shared_ptr; + using RingDataType = std::shared_ptr >; + using RingType = RingBuffer; + + TSMediaSource(const string &vhost, + const string &app, + const string &stream_id, + int ring_size = TS_GOP_SIZE) : MediaSource(TS_SCHEMA, vhost, app, stream_id), _ring_size(ring_size) {} + + ~TSMediaSource() override = default; + + /** + * 获取媒体源的环形缓冲 + */ + const RingType::Ptr &getRing() const { + return _ring; + } + + /** + * 获取播放器个数 + */ + int readerCount() override { + return _ring ? _ring->readerCount() : 0; + } + + /** + * 输入TS包 + * @param packet TS包 + * @param key 是否为关键帧第一个包 + */ + void onWrite(const TSPacket::Ptr &packet, bool key) override { + if (!_ring) { + createRing(); + } + if (key) { + _have_video = true; + } + PacketCache::inputPacket(true, packet, key); + } + + /** + * 情况GOP缓存 + */ + void clearCache() override { + PacketCache::clearCache(); + _ring->clearCache(); + } + +private: + void createRing(){ + weak_ptr weak_self = dynamic_pointer_cast(shared_from_this()); + _ring = std::make_shared(_ring_size, [weak_self](int size) { + auto strong_self = weak_self.lock(); + if (!strong_self) { + return; + } + strong_self->onReaderChanged(size); + }); + onReaderChanged(0); + //注册媒体源 + regist(); + } + + /** + * 合并写回调 + * @param packet_list 合并写缓存列队 + * @param key_pos 是否包含关键帧 + */ + void onFlush(std::shared_ptr > &packet_list, bool key_pos) override { + //如果不存在视频,那么就没有存在GOP缓存的意义,所以确保一直清空GOP缓存 + _ring->write(packet_list, _have_video ? key_pos : true); + } + +private: + bool _have_video = false; + int _ring_size; + RingType::Ptr _ring; +}; + + +}//namespace mediakit +#endif //ZLMEDIAKIT_TSMEDIASOURCE_H diff --git a/src/TS/TSMediaSourceMuxer.h b/src/TS/TSMediaSourceMuxer.h new file mode 100644 index 00000000..553e18f4 --- /dev/null +++ b/src/TS/TSMediaSourceMuxer.h @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_TSMEDIASOURCEMUXER_H +#define ZLMEDIAKIT_TSMEDIASOURCEMUXER_H + +#include "TSMediaSource.h" +#include "Record/TsMuxer.h" + +namespace mediakit { + +class TSMediaSourceMuxer : public TsMuxer, public MediaSourceEventInterceptor, + public std::enable_shared_from_this { +public: + using Ptr = std::shared_ptr; + + TSMediaSourceMuxer(const string &vhost, + const string &app, + const string &stream_id) { + _media_src = std::make_shared(vhost, app, stream_id); + _pool.setSize(256); + } + + ~TSMediaSourceMuxer() override = default; + + void setListener(const std::weak_ptr &listener){ + _listener = listener; + _media_src->setListener(shared_from_this()); + } + + int readerCount() const{ + return _media_src->readerCount(); + } + + void onReaderChanged(MediaSource &sender, int size) override { + _enabled = size; + if (!size) { + _clear_cache = true; + } + MediaSourceEventInterceptor::onReaderChanged(sender, size); + } + + void inputFrame(const Frame::Ptr &frame) override { + if (_clear_cache) { + _clear_cache = false; + _media_src->clearCache(); + } + if (_enabled) { + TsMuxer::inputFrame(frame); + } + } + + bool isEnabled() { + //缓存尚未清空时,还允许触发inputFrame函数,以便及时清空缓存 + return _clear_cache ? true : _enabled; + } + +protected: + void onTs(const void *data, int len,uint32_t timestamp,bool is_idr_fast_packet) override{ + if(!data || !len){ + return; + } + TSPacket::Ptr packet = _pool.obtain(); + packet->assign((char *) data, len); + packet->time_stamp = timestamp; + _media_src->onWrite(packet, is_idr_fast_packet); + } + +private: + bool _enabled = true; + bool _clear_cache = false; + TSMediaSource::PoolType _pool; + TSMediaSource::Ptr _media_src; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_TSMEDIASOURCEMUXER_H diff --git a/tests/test_pusher.cpp b/tests/test_pusher.cpp index d64c84ad..fe7f1b0b 100644 --- a/tests/test_pusher.cpp +++ b/tests/test_pusher.cpp @@ -75,14 +75,7 @@ int domain(const string &playUrl, const string &pushUrl) { //拉一个流,生成一个RtmpMediaSource,源的名称是"app/stream" //你也可以以其他方式生成RtmpMediaSource,比如说MP4文件(请查看test_rtmpPusherMp4.cpp代码) MediaInfo info(pushUrl); - bool enable_rtsp = true; - bool enable_rtmp = true; - if(info._schema == RTSP_SCHEMA){ - enable_rtmp = false; - }else if(info._schema == RTMP_SCHEMA){ - enable_rtsp = false; - } - PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",enable_rtsp,enable_rtmp,false,false,-1 , poller)); + PlayerProxy::Ptr player(new PlayerProxy(DEFAULT_VHOST, "app", "stream",false,false,-1 , poller)); //可以指定rtsp拉流方式,支持tcp和udp方式,默认tcp // (*player)[Client::kRtpType] = Rtsp::RTP_UDP; player->play(playUrl.data());