diff --git a/AUTHORS b/AUTHORS index 655a46dd..344cd599 100644 --- a/AUTHORS +++ b/AUTHORS @@ -17,4 +17,16 @@ huohuo <913481084@qq.com> [Xiaofeng Wang](https://github.com/wasphin) [doodoocoder](https://github.com/doodoocoder) [qingci](https://github.com/Colibrow) -Zhou Weimin \ No newline at end of file +Zhou Weimin +[hewenyuan](https://gitee.com/kingyuanyuan) +sunhui +mirs +Kevin Cheng +Liu Jiang +along +qingci +lyg1949 +zhlong +Luke +大裤衩 <3503207480@qq.com> +droid.chow \ No newline at end of file diff --git a/README.md b/README.md index ee1f12e6..c73116e2 100644 --- a/README.md +++ b/README.md @@ -209,6 +209,18 @@ bash build_docker_images.sh [big panda](<2381267071@qq.com>) [tanningzhong](https://github.com/tanningzhong) [hctym1995](https://github.com/hctym1995) +[hewenyuan](https://gitee.com/kingyuanyuan) +[sunhui]() +[mirs](fangpengcheng@bilibili.com>) +[Kevin Cheng](kevin__cheng@outlook.com>) +[Liu Jiang](root@oopy.org>) +[along](alongl@users.noreply.github.com>) +[qingci](xpy66swsry@gmail.com>) +[lyg1949](zh.ghlong@qq.com>) +[zhlong](zh.ghlong@qq.com>) +[Luke](automan@easydarwin.org>) +[大裤衩](3503207480@qq.com>) +[droid.chow](droid.chow@gmail.com>) ## 使用案例 diff --git a/api/source/mk_events_objects.cpp b/api/source/mk_events_objects.cpp index 50bcbff4..8e120c6f 100644 --- a/api/source/mk_events_objects.cpp +++ b/api/source/mk_events_objects.cpp @@ -214,7 +214,7 @@ API_EXPORT int API_CALL mk_media_source_seek_to(const mk_media_source ctx,uint32 API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ctx, const char *dst_url, uint16_t dst_port, const char *ssrc, int is_udp, on_mk_media_source_send_rtp_result cb, void *user_data){ assert(ctx && dst_url && ssrc); MediaSource *src = (MediaSource *)ctx; - src->startSendRtp(dst_url, dst_port, ssrc, is_udp, [cb, user_data](const SockException &ex){ + src->startSendRtp(dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](const SockException &ex){ if (cb) { cb(user_data, ex.getErrCode(), ex.what()); } @@ -224,7 +224,7 @@ API_EXPORT void API_CALL mk_media_source_start_send_rtp(const mk_media_source ct API_EXPORT int API_CALL mk_media_source_stop_send_rtp(const mk_media_source ctx){ assert(ctx); MediaSource *src = (MediaSource *) ctx; - return src->stopSendRtp(); + return src->stopSendRtp(""); } API_EXPORT void API_CALL mk_media_source_find(const char *schema, diff --git a/api/source/mk_media.cpp b/api/source/mk_media.cpp index 53fb024e..d2d4dcc3 100755 --- a/api/source/mk_media.cpp +++ b/api/source/mk_media.cpp @@ -193,7 +193,7 @@ API_EXPORT void API_CALL mk_media_start_send_rtp(mk_media ctx, const char *dst_u assert(ctx && dst_url && ssrc); MediaHelper::Ptr* obj = (MediaHelper::Ptr*) ctx; //sender参数无用 - (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, [cb, user_data](const SockException &ex){ + (*obj)->getChannel()->startSendRtp(*(MediaSource *) 1, dst_url, dst_port, ssrc, is_udp, 0, [cb, user_data](const SockException &ex){ if (cb) { cb(user_data, ex.getErrCode(), ex.what()); } @@ -204,5 +204,5 @@ API_EXPORT int API_CALL mk_media_stop_send_rtp(mk_media ctx){ assert(ctx); MediaHelper::Ptr *obj = (MediaHelper::Ptr *) ctx; //sender参数无用 - return (*obj)->getChannel()->stopSendRtp(*(MediaSource *) 1); + return (*obj)->getChannel()->stopSendRtp(*(MediaSource *) 1, ""); } \ No newline at end of file diff --git a/api/source/mk_thread.cpp b/api/source/mk_thread.cpp index 08778287..fca1cec6 100644 --- a/api/source/mk_thread.cpp +++ b/api/source/mk_thread.cpp @@ -77,7 +77,7 @@ public: _task->cancel(); } - void start(int ms ,EventPoller &poller){ + void start(uint64_t ms ,EventPoller &poller){ weak_ptr weak_self = shared_from_this(); _task = poller.doDelayTask(ms, [weak_self]() { auto strong_self = weak_self.lock(); diff --git a/build_docker_images.sh b/build_docker_images.sh index 4e8d1620..0ee40722 100644 --- a/build_docker_images.sh +++ b/build_docker_images.sh @@ -4,3 +4,4 @@ docker build -t gemfield/zlmediakit:20.04-runtime-ubuntu18.04 -f docker/ubuntu18 docker build -t gemfield/zlmediakit:20.04-devel-ubuntu18.04 -f docker/ubuntu18.04/Dockerfile.devel . docker build -t gemfield/zlmediakit:20.04-runtime-ubuntu16.04 -f docker/ubuntu16.04/Dockerfile.runtime . docker build -t gemfield/zlmediakit:20.04-devel-ubuntu16.04 -f docker/ubuntu16.04/Dockerfile.devel . +docker build -t gemfield/zlmediakit:centos7-runtime -f docker/centos7/Dockerfile.runtime . diff --git a/docker/centos7/Dockerfile.runtime b/docker/centos7/Dockerfile.runtime new file mode 100644 index 00000000..8f588de5 --- /dev/null +++ b/docker/centos7/Dockerfile.runtime @@ -0,0 +1,131 @@ +ARG Version=7 + +FROM centos:${Version} As build + +ARG HTTP_PROXY=${NO_PROXY} +ARG HTTPS_PROXY=${NO_PROXY} +ARG PKG_CONFIG_VERSION=0.29.2 +ARG CMAKE_VERSION=3.18 +ARG CMAKE_FULL_VERSION=3.18.4 +ARG YASM_VERSION=1.3.0 +ARG NASM_VERSION=2.15.05 +ARG X265_VERSION=3.4 + +RUN yum install -y \ + gcc \ + gcc-c++ \ + kernel-devel \ + kernel-headers \ + openssl \ + openssl-devel \ + git \ + wget \ + which + +WORKDIR /opt + +RUN wget -e "https_proxy=${HTTPS_PROXY}" https://pkgconfig.freedesktop.org/releases/pkg-config-${PKG_CONFIG_VERSION}.tar.gz \ + && tar -zxvf pkg-config-${PKG_CONFIG_VERSION}.tar.gz \ + && cd pkg-config-${PKG_CONFIG_VERSION} \ + && ./configure --with-internal-glib \ + && make -j8 \ + && make install + +RUN wget -e "https_proxy=${HTTPS_PROXY}" https://cmake.org/files/v${CMAKE_VERSION}/cmake-${CMAKE_FULL_VERSION}.tar.gz \ + && tar -zxvf cmake-${CMAKE_FULL_VERSION}.tar.gz \ + && cd cmake-${CMAKE_FULL_VERSION} \ + && ./bootstrap \ + && gmake -j8 \ + && gmake install + +RUN cd /opt \ + && wget -e "https_proxy=${HTTPS_PROXY}" http://www.tortall.net/projects/yasm/releases/yasm-${YASM_VERSION}.tar.gz \ + && tar zxvf yasm-${YASM_VERSION}.tar.gz \ + && cd yasm-${YASM_VERSION} \ + && ./configure \ + && make -j8 \ + && make install + +RUN wget -e "https_proxy=${HTTPS_PROXY}" https://www.nasm.us/pub/nasm/releasebuilds/2.15.05/nasm-${NASM_VERSION}.tar.xz \ + && tar -xvJf nasm-${NASM_VERSION}.tar.xz \ + && cd nasm-${NASM_VERSION} \ + && ./configure --disable-shared --enable-static \ + && make -j8 \ + && make install + +RUN cd /opt \ + && git clone https://code.videolan.org/videolan/x264.git \ + && cd x264 \ + && git checkout -b stable origin/stable \ + && git pull --rebase \ + && ./configure --enable-pic --enable-shared --disable-asm \ + && make -j8 \ + && make install \ + && export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH + +RUN cd /opt \ + && wget -e "https_proxy=${HTTPS_PROXY}" https://github.com/videolan/x265/archive/Release_${X265_VERSION}.tar.gz \ + && tar zxvf Release_${X265_VERSION}.tar.gz \ + && cd x265-Release_${X265_VERSION}/build/linux \ + && cmake ../../source \ + && make -j8 \ + && make install \ + && export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH + +RUN cd /opt \ + && git clone https://gitee.com/xia-chu/FFmpeg.git \ + && cd /opt/FFmpeg \ + && export PKG_CONFIG_PATH=/usr/local/lib/pkgconfig:$PKG_CONFIG_PATH \ + && ./configure \ + --disable-debug \ + --disable-doc \ + --disable-shared \ + --enable-gpl \ + --enable-version3 \ + --enable-static \ + --enable-nonfree \ + --enable-pthreads \ + --enable-libx264 \ + --enable-libx265 \ + --enable-small \ + --pkgconfigdir=/usr/local/lib/pkgconfig \ + --pkg-config-flags="--static" \ + && make -j8 \ + && make install + +RUN cd /opt \ + && git clone --depth 1 https://github.com/xia-chu/ZLMediaKit.git \ + && cd ZLMediaKit \ + && git submodule update --init \ + && mkdir -p build release/linux/Release/ \ + && cd build \ + && cmake -DCMAKE_BUILD_TYPE=Release .. \ + && make -j8 + +RUN mkdir -p /opt/build/opt/zlm/ /opt/build/usr/local/bin/ /opt/build/usr/bin/ /opt/build/usr/local/lib/ /opt/build/etc/localtime \ + && cd /opt/build \ + && /usr/bin/cp -ip /usr/local/lib/libx26* ./usr/local/lib \ + && /usr/bin/cp -ip /usr/local/bin/ffmpeg ./usr/local/bin \ + && /usr/bin/cp -ip /opt/ZLMediaKit/release/linux/Release/MediaServer ./opt/zlm/ \ + && /usr/bin/cp -irp /opt/ZLMediaKit/release/linux/Release/www ./opt/zlm/ \ + && /usr/bin/cp -ip /opt/ZLMediaKit/tests/ssl.p12 ./opt/zlm/ \ + && /usr/bin/cp -ip /usr/bin/which ./usr/bin/ + +FROM centos:${Version} +LABEL maintainer="chengxiaosheng " project-url="https://github.com/xia-chu/ZLMediaKit" description="一个基于C++11的高性能运营级流媒体服务框架" + +EXPOSE 9000/tcp \ + 1935/tcp \ + 19350/tcp \ + 554/tcp \ + 322/tcp \ + 80/tcp \ + 443/tcp \ + 10000/udp \ + 10000/tcp + +WORKDIR /opt/zlm +VOLUME [ "/opt/zlm/conf/","/opt/zlm/log/","opt/zlm/ffmpeg/"] +COPY --from=build /opt/build / +ENV LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH TZ=Asia/Shanghai +CMD ./MediaServer -c ./conf/config.ini \ No newline at end of file diff --git a/postman/ZLMediaKit.postman_collection.json b/postman/ZLMediaKit.postman_collection.json index 48d93811..f4c88c3e 100644 --- a/postman/ZLMediaKit.postman_collection.json +++ b/postman/ZLMediaKit.postman_collection.json @@ -1023,6 +1023,68 @@ }, "response": [] }, + { + "name": "暂停RTP超时检查(pauseRtpCheck)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/pauseRtpCheck?secret={{ZLMediaKit_secret}}&stream_id=test", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "pauseRtpCheck" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "stream_id", + "value": "test", + "description": "该端口绑定的流id" + } + ] + } + }, + "response": [] + }, + { + "name": "恢复RTP超时检查(resumeRtpCheck)", + "request": { + "method": "GET", + "header": [], + "url": { + "raw": "{{ZLMediaKit_URL}}/index/api/resumeRtpCheck?secret={{ZLMediaKit_secret}}&stream_id=test", + "host": [ + "{{ZLMediaKit_URL}}" + ], + "path": [ + "index", + "api", + "resumeRtpCheck" + ], + "query": [ + { + "key": "secret", + "value": "{{ZLMediaKit_secret}}", + "description": "api操作密钥(配置文件配置),如果操作ip是127.0.0.1,则不需要此参数" + }, + { + "key": "stream_id", + "value": "test", + "description": "该端口绑定的流id" + } + ] + } + }, + "response": [] + }, { "name": "获取RTP服务器列表(listRtpServer)", "request": { @@ -1088,7 +1150,7 @@ { "key": "ssrc", "value": "1", - "description": "rtp的ssrc" + "description": "rtp推流的ssrc,ssrc不同时,可以推流到多个上级服务器" }, { "key": "dst_url", @@ -1104,6 +1166,12 @@ "key": "is_udp", "value": "0", "description": "是否为udp模式,否则为tcp模式" + }, + { + "key": "src_port", + "value": "0", + "description": "指定tcp/udp客户端使用的本地端口,0时为随机端口,该参数非必选参数,不传时为随机端口。", + "disabled": true } ] } @@ -1145,6 +1213,12 @@ "key": "stream", "value": "obs", "description": "流id,例如 obs" + }, + { + "key": "ssrc", + "value": "", + "description": "根据ssrc关停某路rtp推流,不传时关闭所有推流", + "disabled": true } ] } diff --git a/server/WebApi.cpp b/server/WebApi.cpp old mode 100644 new mode 100755 index 68cd38c4..85a37fdd --- a/server/WebApi.cpp +++ b/server/WebApi.cpp @@ -45,15 +45,6 @@ using namespace toolkit; using namespace mediakit; namespace API { -typedef enum { - Exception = -400,//代码抛异常 - InvalidArgs = -300,//参数不合法 - SqlFailed = -200,//sql执行失败 - AuthFailed = -100,//鉴权失败 - OtherFailed = -1,//业务代码执行失败, - Success = 0//执行成功 -} ApiErr; - #define API_FIELD "api." const string kApiDebug = API_FIELD"apiDebug"; const string kSecret = API_FIELD"secret"; @@ -68,42 +59,7 @@ static onceToken token([]() { }); }//namespace API - -class ApiRetException: public std::runtime_error { -public: - ApiRetException(const char *str = "success" ,int code = API::Success):runtime_error(str){ - _code = code; - } - ~ApiRetException() = default; - int code(){ return _code; } -private: - int _code; -}; - -class AuthException : public ApiRetException { -public: - AuthException(const char *str):ApiRetException(str,API::AuthFailed){} - ~AuthException() = default; -}; - -class InvalidArgsException: public ApiRetException { -public: - InvalidArgsException(const char *str):ApiRetException(str,API::InvalidArgs){} - ~InvalidArgsException() = default; -}; - -class SuccessException: public ApiRetException { -public: - SuccessException():ApiRetException("success",API::Success){} - ~SuccessException() = default; -}; - -#define API_ARGS1 SockInfo &sender,HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, ApiArgsType &allArgs, Json::Value &val -#define API_ARGS2 API_ARGS1, const HttpSession::HttpResponseInvoker &invoker -#define API_ARGS_VALUE1 sender,headerIn,headerOut,allArgs,val - -typedef map ApiArgsType; -typedef function HttpApi; +using HttpApi = function; //http api列表 static map s_map_api; @@ -123,7 +79,7 @@ static void responseApi(int code, const string &msg, const HttpSession::HttpResp static ApiArgsType getAllArgs(const Parser &parser); -static HttpApi toApi(const function &cb) { +static HttpApi toApi(const function &cb) { return [cb](const Parser &parser, const HttpSession::HttpResponseInvoker &invoker, SockInfo &sender) { GET_CONFIG(string, charSet, Http::kCharSet); HttpSession::KeyValue headerOut; @@ -132,26 +88,62 @@ static HttpApi toApi(const function &cb) { Json::Value val; val["code"] = API::Success; + //参数解析成map auto args = getAllArgs(parser); cb(sender, parser.getHeader(), headerOut, args, val, invoker); }; } -static HttpApi toApi(const function &cb) { - return toApi([cb](API_ARGS2) { - cb(API_ARGS_VALUE1); +static HttpApi toApi(const function &cb) { + return toApi([cb](API_ARGS_MAP_ASYNC) { + cb(API_ARGS_VALUE); invoker("200 OK", headerOut, val.toStyledString()); }); } -template -static void api_regist(const string &api_path, FUNC &&func) { - s_map_api.emplace(api_path, toApi(std::move(func))); +static HttpApi toApi(const function &cb) { + return [cb](const Parser &parser, const HttpSession::HttpResponseInvoker &invoker, SockInfo &sender) { + GET_CONFIG(string, charSet, Http::kCharSet); + HttpSession::KeyValue headerOut; + headerOut["Content-Type"] = string("application/json; charset=") + charSet; + + Json::Value out; + out["code"] = API::Success; + + if (parser["Content-Type"].find("application/json") == string::npos) { + throw InvalidArgsException("该接口只支持json格式的请求"); + } + //参数解析成json对象然后处理 + Json::Value in; + Json::Reader reader; + reader.parse(parser.Content(), in); + + cb(sender, parser.getHeader(), headerOut, in, out, invoker); + }; } -#define api_regist1 api_regist -#define api_regist2 api_regist +static HttpApi toApi(const function &cb) { + return toApi([cb](API_ARGS_JSON_ASYNC) { + cb(API_ARGS_VALUE); + invoker("200 OK", headerOut, val.toStyledString()); + }); +} +void api_regist(const string &api_path, const function &func) { + s_map_api.emplace(api_path, toApi(func)); +} + +void api_regist(const string &api_path, const function &func) { + s_map_api.emplace(api_path, toApi(func)); +} + +void api_regist(const string &api_path, const function &func) { + s_map_api.emplace(api_path, toApi(func)); +} + +void api_regist(const string &api_path, const function &func) { + s_map_api.emplace(api_path, toApi(func)); +} //获取HTTP请求中url参数、content参数 static ApiArgsType getAllArgs(const Parser &parser) { @@ -240,29 +232,6 @@ static inline void addHttpListener(){ }); } -template -bool checkArgs(Args &&args,First &&first){ - return !args[first].empty(); -} - -template -bool checkArgs(Args &&args,First &&first,KeyTypes && ...keys){ - return !args[first].empty() && checkArgs(std::forward(args),std::forward(keys)...); -} - -#define CHECK_ARGS(...) \ - if(!checkArgs(allArgs,##__VA_ARGS__)){ \ - throw InvalidArgsException("缺少必要参数:" #__VA_ARGS__); \ - } - -#define CHECK_SECRET() \ - if(sender.get_peer_ip() != "127.0.0.1"){ \ - CHECK_ARGS("secret"); \ - if(api_secret != allArgs["secret"]){ \ - throw AuthException("secret错误"); \ - } \ - } - //拉流代理器列表 static unordered_map s_proxyMap; static recursive_mutex s_proxyMapMtx; @@ -296,7 +265,7 @@ void installWebApi() { //获取线程负载 //测试url http://127.0.0.1/index/api/getThreadsLoad - api_regist2("/index/api/getThreadsLoad",[](API_ARGS2){ + api_regist("/index/api/getThreadsLoad",[](API_ARGS_MAP_ASYNC){ EventPollerPool::Instance().getExecutorDelay([invoker, headerOut](const vector &vecDelay) { Value val; auto vec = EventPollerPool::Instance().getExecutorLoad(); @@ -314,7 +283,7 @@ void installWebApi() { //获取后台工作线程负载 //测试url http://127.0.0.1/index/api/getWorkThreadsLoad - api_regist2("/index/api/getWorkThreadsLoad", [](API_ARGS2){ + api_regist("/index/api/getWorkThreadsLoad", [](API_ARGS_MAP_ASYNC){ WorkThreadPool::Instance().getExecutorDelay([invoker, headerOut](const vector &vecDelay) { Value val; auto vec = WorkThreadPool::Instance().getExecutorLoad(); @@ -332,7 +301,7 @@ void installWebApi() { //获取服务器配置 //测试url http://127.0.0.1/index/api/getServerConfig - api_regist1("/index/api/getServerConfig",[](API_ARGS1){ + api_regist("/index/api/getServerConfig",[](API_ARGS_MAP){ CHECK_SECRET(); Value obj; for (auto &pr : mINI::Instance()) { @@ -344,7 +313,7 @@ void installWebApi() { //设置服务器配置 //测试url(比如关闭http api调试) http://127.0.0.1/index/api/setServerConfig?api.apiDebug=0 //你也可以通过http post方式传参,可以通过application/x-www-form-urlencoded或application/json方式传参 - api_regist1("/index/api/setServerConfig",[](API_ARGS1){ + api_regist("/index/api/setServerConfig",[](API_ARGS_MAP){ CHECK_SECRET(); auto &ini = mINI::Instance(); int changed = API::Success; @@ -368,7 +337,7 @@ void installWebApi() { }); - static auto s_get_api_list = [](API_ARGS1){ + static auto s_get_api_list = [](API_ARGS_MAP){ CHECK_SECRET(); for(auto &pr : s_map_api){ val["data"].append(pr.first); @@ -377,20 +346,20 @@ void installWebApi() { //获取服务器api列表 //测试url http://127.0.0.1/index/api/getApiList - api_regist1("/index/api/getApiList",[](API_ARGS1){ - s_get_api_list(API_ARGS_VALUE1); + api_regist("/index/api/getApiList",[](API_ARGS_MAP){ + s_get_api_list(API_ARGS_VALUE); }); //获取服务器api列表 //测试url http://127.0.0.1/index/ - api_regist1("/index/",[](API_ARGS1){ - s_get_api_list(API_ARGS_VALUE1); + api_regist("/index/",[](API_ARGS_MAP){ + s_get_api_list(API_ARGS_VALUE); }); #if !defined(_WIN32) //重启服务器,只有Daemon方式才能重启,否则是直接关闭! //测试url http://127.0.0.1/index/api/restartServer - api_regist1("/index/api/restartServer",[](API_ARGS1){ + api_regist("/index/api/restartServer",[](API_ARGS_MAP){ CHECK_SECRET(); EventPollerPool::Instance().getPoller()->doDelayTask(1000,[](){ //尝试正常退出 @@ -468,7 +437,7 @@ void installWebApi() { //测试url0(获取所有流) http://127.0.0.1/index/api/getMediaList //测试url1(获取虚拟主机为"__defaultVost__"的流) http://127.0.0.1/index/api/getMediaList?vhost=__defaultVost__ //测试url2(获取rtsp类型的流) http://127.0.0.1/index/api/getMediaList?schema=rtsp - api_regist1("/index/api/getMediaList",[](API_ARGS1){ + api_regist("/index/api/getMediaList",[](API_ARGS_MAP){ CHECK_SECRET(); //获取所有MediaSource列表 MediaSource::for_each_media([&](const MediaSource::Ptr &media){ @@ -486,14 +455,14 @@ void installWebApi() { }); //测试url http://127.0.0.1/index/api/isMediaOnline?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs - api_regist1("/index/api/isMediaOnline",[](API_ARGS1){ + api_regist("/index/api/isMediaOnline",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("schema","vhost","app","stream"); val["online"] = (bool) (MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"])); }); //测试url http://127.0.0.1/index/api/getMediaInfo?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs - api_regist1("/index/api/getMediaInfo",[](API_ARGS1){ + api_regist("/index/api/getMediaInfo",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("schema","vhost","app","stream"); auto src = MediaSource::find(allArgs["schema"],allArgs["vhost"],allArgs["app"],allArgs["stream"]); @@ -508,7 +477,7 @@ void installWebApi() { //主动关断流,包括关断拉流、推流 //测试url http://127.0.0.1/index/api/close_stream?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 - api_regist1("/index/api/close_stream",[](API_ARGS1){ + api_regist("/index/api/close_stream",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("schema","vhost","app","stream"); //踢掉推流器 @@ -530,7 +499,7 @@ void installWebApi() { //批量主动关断流,包括关断拉流、推流 //测试url http://127.0.0.1/index/api/close_streams?schema=rtsp&vhost=__defaultVhost__&app=live&stream=obs&force=1 - api_regist1("/index/api/close_streams",[](API_ARGS1){ + api_regist("/index/api/close_streams",[](API_ARGS_MAP){ CHECK_SECRET(); //筛选命中个数 int count_hit = 0; @@ -566,7 +535,7 @@ void installWebApi() { //获取所有TcpSession列表信息 //可以根据本地端口和远端ip来筛选 //测试url(筛选某端口下的tcp会话) http://127.0.0.1/index/api/getAllSession?local_port=1935 - api_regist1("/index/api/getAllSession",[](API_ARGS1){ + api_regist("/index/api/getAllSession",[](API_ARGS_MAP){ CHECK_SECRET(); Value jsession; uint16_t local_port = allArgs["local_port"].as(); @@ -591,7 +560,7 @@ void installWebApi() { //断开tcp连接,比如说可以断开rtsp、rtmp播放器等 //测试url http://127.0.0.1/index/api/kick_session?id=123456 - api_regist1("/index/api/kick_session",[](API_ARGS1){ + api_regist("/index/api/kick_session",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("id"); //踢掉tcp会话 @@ -605,7 +574,7 @@ void installWebApi() { //批量断开tcp连接,比如说可以断开rtsp、rtmp播放器等 //测试url http://127.0.0.1/index/api/kick_sessions?local_port=1935 - api_regist1("/index/api/kick_sessions",[](API_ARGS1){ + api_regist("/index/api/kick_sessions",[](API_ARGS_MAP){ CHECK_SECRET(); uint16_t local_port = allArgs["local_port"].as(); string &peer_ip = allArgs["peer_ip"]; @@ -686,7 +655,7 @@ void installWebApi() { //动态添加rtsp/rtmp推流代理 //测试url http://127.0.0.1/index/api/addStreamPusherProxy?schema=rtmp&vhost=__defaultVhost__&app=proxy&stream=0&dst_url=rtmp://127.0.0.1/live/obs - api_regist2("/index/api/addStreamPusherProxy", [](API_ARGS2) { + api_regist("/index/api/addStreamPusherProxy", [](API_ARGS_MAP_ASYNC) { CHECK_SECRET(); CHECK_ARGS("schema","vhost","app","stream"); @@ -730,7 +699,7 @@ void installWebApi() { //关闭推流代理 //测试url http://127.0.0.1/index/api/delStreamPusherProxy?key=__defaultVhost__/proxy/0 - api_regist1("/index/api/delStreamPusherProxy",[](API_ARGS1){ + api_regist("/index/api/delStreamPusherProxy",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("key"); lock_guard lck(s_proxyPusherMapMtx); @@ -775,7 +744,7 @@ void installWebApi() { player->play(url); }; - api_regist1("/index/api/getSourceStreamInfo",[](API_ARGS1){ + api_regist("/index/api/getSourceStreamInfo",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("schema","vhost","app","stream"); @@ -797,7 +766,7 @@ void installWebApi() { //动态添加rtsp/rtmp拉流代理 //测试url http://127.0.0.1/index/api/addStreamProxy?vhost=__defaultVhost__&app=proxy&enable_rtsp=1&enable_rtmp=1&stream=0&url=rtmp://127.0.0.1/live/obs - api_regist2("/index/api/addStreamProxy",[](API_ARGS2){ + api_regist("/index/api/addStreamProxy",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); CHECK_ARGS("vhost","app","stream","url"); addStreamProxy(allArgs["vhost"], @@ -820,7 +789,7 @@ void installWebApi() { //关闭拉流代理 //测试url http://127.0.0.1/index/api/delStreamProxy?key=__defaultVhost__/proxy/0 - api_regist1("/index/api/delStreamProxy",[](API_ARGS1){ + api_regist("/index/api/delStreamProxy",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("key"); lock_guard lck(s_proxyMapMtx); @@ -860,7 +829,7 @@ void installWebApi() { //动态添加rtsp/rtmp拉流代理 //测试url http://127.0.0.1/index/api/addFFmpegSource?src_url=http://live.hkstv.hk.lxdns.com/live/hks2/playlist.m3u8&dst_url=rtmp://127.0.0.1/live/hks2&timeout_ms=10000 - api_regist2("/index/api/addFFmpegSource",[](API_ARGS2){ + api_regist("/index/api/addFFmpegSource",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); CHECK_ARGS("src_url","dst_url","timeout_ms"); auto src_url = allArgs["src_url"]; @@ -881,34 +850,24 @@ void installWebApi() { }); }); - - static auto api_delFFmpegSource = [](API_ARGS1){ + //关闭拉流代理 + //测试url http://127.0.0.1/index/api/delFFmepgSource?key=key + api_regist("/index/api/delFFmpegSource",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("key"); lock_guard lck(s_ffmpegMapMtx); val["data"]["flag"] = s_ffmpegMap.erase(allArgs["key"]) == 1; - }; - - //关闭拉流代理 - //测试url http://127.0.0.1/index/api/delFFmepgSource?key=key - api_regist1("/index/api/delFFmpegSource",[](API_ARGS1){ - api_delFFmpegSource(API_ARGS_VALUE1); - }); - - //此处为了兼容之前的拼写错误 - api_regist1("/index/api/delFFmepgSource",[](API_ARGS1){ - api_delFFmpegSource(API_ARGS_VALUE1); }); //新增http api下载可执行程序文件接口 //测试url http://127.0.0.1/index/api/downloadBin - api_regist2("/index/api/downloadBin",[](API_ARGS2){ + api_regist("/index/api/downloadBin",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); invoker.responseFile(headerIn,StrCaseMap(),exePath()); }); #if defined(ENABLE_RTPPROXY) - api_regist1("/index/api/getRtpInfo",[](API_ARGS1){ + api_regist("/index/api/getRtpInfo",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("stream_id"); @@ -924,7 +883,7 @@ void installWebApi() { val["local_ip"] = process->get_local_ip(); }); - api_regist1("/index/api/setRtpPause", [](API_ARGS1){ + api_regist("/index/api/setRtpPause", [](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("stream_id", "pause"); @@ -938,7 +897,7 @@ void installWebApi() { process->setRtpPause(pause); }); - api_regist1("/index/api/openRtpServer",[](API_ARGS1){ + api_regist("/index/api/openRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("port", "enable_tcp", "stream_id"); @@ -964,7 +923,7 @@ void installWebApi() { val["port"] = server->getPort(); }); - api_regist1("/index/api/closeRtpServer",[](API_ARGS1){ + api_regist("/index/api/closeRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("stream_id"); @@ -979,7 +938,7 @@ void installWebApi() { val["hit"] = 1; }); - api_regist1("/index/api/listRtpServer",[](API_ARGS1){ + api_regist("/index/api/listRtpServer",[](API_ARGS_MAP){ CHECK_SECRET(); lock_guard lck(s_rtpServerMapMtx); @@ -991,7 +950,7 @@ void installWebApi() { } }); - api_regist2("/index/api/startSendRtp",[](API_ARGS2){ + api_regist("/index/api/startSendRtp",[](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); CHECK_ARGS("vhost", "app", "stream", "ssrc", "dst_url", "dst_port", "is_udp"); @@ -1000,7 +959,8 @@ void installWebApi() { throw ApiRetException("该媒体流不存在", API::OtherFailed); } - src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], [val, headerOut, invoker](const SockException &ex){ + //src_port为空时,则随机本地端口 + src->startSendRtp(allArgs["dst_url"], allArgs["dst_port"], allArgs["ssrc"], allArgs["is_udp"], allArgs["src_port"], [val, headerOut, invoker](const SockException &ex){ if (ex) { const_cast(val)["code"] = API::OtherFailed; const_cast(val)["msg"] = ex.what(); @@ -1009,7 +969,7 @@ void installWebApi() { }); }); - api_regist1("/index/api/stopSendRtp",[](API_ARGS1){ + api_regist("/index/api/stopSendRtp",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("vhost", "app", "stream"); @@ -1018,16 +978,46 @@ void installWebApi() { throw ApiRetException("该媒体流不存在", API::OtherFailed); } - if (!src->stopSendRtp()) { + //ssrc如果为空,关闭全部 + if (!src->stopSendRtp(allArgs["ssrc"])) { throw ApiRetException("尚未开始推流,停止失败", API::OtherFailed); } }); + api_regist("/index/api/pauseRtpCheck", [](API_ARGS_MAP) { + CHECK_SECRET(); + CHECK_ARGS("stream_id"); + //只是暂停流的检查,流媒体服务器做为流负载服务,收流就转发,RTSP/RTMP有自己暂停协议 + lock_guard lck(s_rtpServerMapMtx); + auto it = s_rtpServerMap.find(allArgs["stream_id"]); + if (it == s_rtpServerMap.end()) { + val["hit"] = 0; + return; + } + auto server = it->second; + server->pauseRtpCheck(); + val["hit"] = 1; + }); + + api_regist("/index/api/resumeRtpCheck", [](API_ARGS_MAP) { + CHECK_SECRET(); + CHECK_ARGS("stream_id"); + + lock_guard lck(s_rtpServerMapMtx); + auto it = s_rtpServerMap.find(allArgs["stream_id"]); + if (it == s_rtpServerMap.end()) { + val["hit"] = 0; + return; + } + auto server = it->second; + server->resumeRtpCheck(); + val["hit"] = 1; + }); #endif//ENABLE_RTPPROXY // 开始录制hls或MP4 - api_regist1("/index/api/startRecord",[](API_ARGS1){ + api_regist("/index/api/startRecord",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("type","vhost","app","stream"); auto result = Recorder::startRecord((Recorder::type) allArgs["type"].as(), @@ -1041,7 +1031,7 @@ void installWebApi() { }); // 停止录制hls或MP4 - api_regist1("/index/api/stopRecord",[](API_ARGS1){ + api_regist("/index/api/stopRecord",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("type","vhost","app","stream"); auto result = Recorder::stopRecord((Recorder::type) allArgs["type"].as(), @@ -1054,7 +1044,7 @@ void installWebApi() { }); // 获取hls或MP4录制状态 - api_regist1("/index/api/isRecording",[](API_ARGS1){ + api_regist("/index/api/isRecording",[](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("type","vhost","app","stream"); val["status"] = Recorder::isRecording((Recorder::type) allArgs["type"].as(), @@ -1065,7 +1055,7 @@ void installWebApi() { //获取录像文件夹列表或mp4文件列表 //http://127.0.0.1/index/api/getMp4RecordFile?vhost=__defaultVhost__&app=live&stream=ss&period=2020-01 - api_regist1("/index/api/getMp4RecordFile", [](API_ARGS1){ + api_regist("/index/api/getMp4RecordFile", [](API_ARGS_MAP){ CHECK_SECRET(); CHECK_ARGS("vhost", "app", "stream"); auto record_path = Recorder::getRecordPath(Recorder::type_mp4, allArgs["vhost"], allArgs["app"],allArgs["stream"]); @@ -1120,7 +1110,7 @@ void installWebApi() { //获取截图缓存或者实时截图 //http://127.0.0.1/index/api/getSnap?url=rtmp://127.0.0.1/record/robot.mp4&timeout_sec=10&expire_sec=3 - api_regist2("/index/api/getSnap", [](API_ARGS2){ + api_regist("/index/api/getSnap", [](API_ARGS_MAP_ASYNC){ CHECK_SECRET(); CHECK_ARGS("url", "timeout_sec", "expire_sec"); GET_CONFIG(string, snap_root, API::kSnapRoot); @@ -1183,7 +1173,7 @@ void installWebApi() { }); ////////////以下是注册的Hook API//////////// - api_regist1("/index/hook/on_publish",[](API_ARGS1){ + api_regist("/index/hook/on_publish",[](API_ARGS_MAP){ //开始推流事件 //转换成rtsp或rtmp val["enableRtxp"] = true; @@ -1193,21 +1183,21 @@ void installWebApi() { val["enableMP4"] = false; }); - api_regist1("/index/hook/on_play",[](API_ARGS1){ + api_regist("/index/hook/on_play",[](API_ARGS_MAP){ //开始播放事件 }); - api_regist1("/index/hook/on_flow_report",[](API_ARGS1){ + api_regist("/index/hook/on_flow_report",[](API_ARGS_MAP){ //流量统计hook api }); - api_regist1("/index/hook/on_rtsp_realm",[](API_ARGS1){ + api_regist("/index/hook/on_rtsp_realm",[](API_ARGS_MAP){ //rtsp是否需要鉴权,默认需要鉴权 val["code"] = API::Success; val["realm"] = "zlmediakit_reaml"; }); - api_regist1("/index/hook/on_rtsp_auth",[](API_ARGS1){ + api_regist("/index/hook/on_rtsp_auth",[](API_ARGS_MAP){ //rtsp鉴权密码,密码等于用户名 //rtsp可以有双重鉴权!后面还会触发on_play事件 CHECK_ARGS("user_name"); @@ -1216,13 +1206,13 @@ void installWebApi() { val["passwd"] = allArgs["user_name"].data(); }); - api_regist1("/index/hook/on_stream_changed",[](API_ARGS1){ + api_regist("/index/hook/on_stream_changed",[](API_ARGS_MAP){ //媒体注册或反注册事件 }); #if !defined(_WIN32) - api_regist2("/index/hook/on_stream_not_found_ffmpeg",[](API_ARGS2){ + api_regist("/index/hook/on_stream_not_found_ffmpeg",[](API_ARGS_MAP_ASYNC){ //媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流 CHECK_SECRET(); CHECK_ARGS("vhost","app","stream"); @@ -1254,7 +1244,7 @@ void installWebApi() { }); #endif//!defined(_WIN32) - api_regist2("/index/hook/on_stream_not_found",[](API_ARGS2){ + api_regist("/index/hook/on_stream_not_found",[](API_ARGS_MAP_ASYNC){ //媒体未找到事件,我们都及时拉流hks作为替代品,目的是为了测试按需拉流 CHECK_SECRET(); CHECK_ARGS("vhost","app","stream"); @@ -1278,19 +1268,19 @@ void installWebApi() { }); }); - api_regist1("/index/hook/on_record_mp4",[](API_ARGS1){ + api_regist("/index/hook/on_record_mp4",[](API_ARGS_MAP){ //录制mp4分片完毕事件 }); - api_regist1("/index/hook/on_record_hls",[](API_ARGS1){ + api_regist("/index/hook/on_record_hls",[](API_ARGS_MAP){ //录制hls分片完毕事件 }); - api_regist1("/index/hook/on_shell_login",[](API_ARGS1){ + api_regist("/index/hook/on_shell_login",[](API_ARGS_MAP){ //shell登录调试事件 }); - api_regist1("/index/hook/on_stream_none_reader",[](API_ARGS1){ + api_regist("/index/hook/on_stream_none_reader",[](API_ARGS_MAP){ //无人观看流默认关闭 val["close"] = true; }); @@ -1300,7 +1290,7 @@ void installWebApi() { return true; }; - api_regist1("/index/hook/on_http_access",[](API_ARGS1){ + api_regist("/index/hook/on_http_access",[](API_ARGS_MAP){ //在这里根据allArgs["params"](url参数)来判断该http客户端是否有权限访问该文件 if(!checkAccess(allArgs["params"])){ //无访问权限 @@ -1321,11 +1311,9 @@ void installWebApi() { }); - api_regist1("/index/hook/on_server_started",[](API_ARGS1){ + api_regist("/index/hook/on_server_started",[](API_ARGS_MAP){ //服务器重启报告 }); - - } void unInstallWebApi(){ diff --git a/server/WebApi.h b/server/WebApi.h old mode 100644 new mode 100755 index cebe2b2a..0d828572 --- a/server/WebApi.h +++ b/server/WebApi.h @@ -12,10 +12,21 @@ #define ZLMEDIAKIT_WEBAPI_H #include +#include +#include "jsoncpp/json.h" +#include "Common/Parser.h" +#include "Network/Socket.h" +#include "Http/HttpSession.h" + using namespace std; +using namespace Json; +using namespace toolkit; +using namespace mediakit; + +//配置文件路径 +extern string g_ini_file; namespace mediakit { - ////////////RTSP服务器配置/////////// namespace Rtsp { extern const string kPort; @@ -25,14 +36,94 @@ extern const string kPort; namespace Rtmp { extern const string kPort; } //namespace RTMP - } // namespace mediakit +namespace API { +typedef enum { + NotFound = -500,//未找到 + Exception = -400,//代码抛异常 + InvalidArgs = -300,//参数不合法 + SqlFailed = -200,//sql执行失败 + AuthFailed = -100,//鉴权失败 + OtherFailed = -1,//业务代码执行失败, + Success = 0//执行成功 +} ApiErr; +}//namespace API + +class ApiRetException: public std::runtime_error { +public: + ApiRetException(const char *str = "success" ,int code = API::Success):runtime_error(str){ + _code = code; + } + ~ApiRetException() = default; + int code(){ return _code; } +private: + int _code; +}; + +class AuthException : public ApiRetException { +public: + AuthException(const char *str):ApiRetException(str,API::AuthFailed){} + ~AuthException() = default; +}; + +class InvalidArgsException: public ApiRetException { +public: + InvalidArgsException(const char *str):ApiRetException(str,API::InvalidArgs){} + ~InvalidArgsException() = default; +}; + +class SuccessException: public ApiRetException { +public: + SuccessException():ApiRetException("success",API::Success){} + ~SuccessException() = default; +}; + +using ApiArgsType = map; + +#define API_ARGS_MAP SockInfo &sender, HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, ApiArgsType &allArgs, Json::Value &val +#define API_ARGS_MAP_ASYNC API_ARGS_MAP, const HttpSession::HttpResponseInvoker &invoker +#define API_ARGS_JSON SockInfo &sender, HttpSession::KeyValue &headerIn, HttpSession::KeyValue &headerOut, Json::Value &allArgs, Json::Value &val +#define API_ARGS_JSON_ASYNC API_ARGS_JSON, const HttpSession::HttpResponseInvoker &invoker +#define API_ARGS_VALUE sender, headerIn, headerOut, allArgs, val + +//注册http请求参数是map类型的http api +void api_regist(const string &api_path, const function &func); +//注册http请求参数是map类型,但是可以异步回复的的http api +void api_regist(const string &api_path, const function &func); + +//注册http请求参数是Json::Value类型的http api(可以支持多级嵌套的json参数对象) +void api_regist(const string &api_path, const function &func); +//注册http请求参数是Json::Value类型,但是可以异步回复的的http api +void api_regist(const string &api_path, const function &func); + +template +bool checkArgs(Args &&args, First &&first) { + return !args[first].empty(); +} + +template +bool checkArgs(Args &&args, First &&first, KeyTypes &&...keys) { + return !args[first].empty() && checkArgs(std::forward(args), std::forward(keys)...); +} + +//检查http参数是否为空的宏 +#define CHECK_ARGS(...) \ + if(!checkArgs(allArgs,##__VA_ARGS__)){ \ + throw InvalidArgsException("缺少必要参数:" #__VA_ARGS__); \ + } + +//检查http参数中是否附带secret密钥的宏,127.0.0.1的ip不检查密钥 +#define CHECK_SECRET() \ + if(sender.get_peer_ip() != "127.0.0.1"){ \ + CHECK_ARGS("secret"); \ + if(api_secret != allArgs["secret"]){ \ + throw AuthException("secret错误"); \ + } \ + } void installWebApi(); void unInstallWebApi(); -//配置文件路径 -extern string g_ini_file; class ProxyPusherInfo { public: diff --git a/server/WebHook.cpp b/server/WebHook.cpp old mode 100644 new mode 100755 index 9bedf4c8..3bc82c14 --- a/server/WebHook.cpp +++ b/server/WebHook.cpp @@ -9,9 +9,7 @@ */ #include -#include "jsoncpp/json.h" #include "Util/logger.h" -#include "Util/util.h" #include "Util/onceToken.h" #include "Util/NoticeCenter.h" #include "Common/config.h" @@ -25,21 +23,9 @@ #include "WebApi.h" #include "Util/base64.h" -using namespace Json; using namespace toolkit; using namespace mediakit; - -//支持json或urlencoded方式传输参数 -#define JSON_ARGS - -#ifdef JSON_ARGS -typedef Value ArgsType; -#else -typedef HttpArgs ArgsType; -#endif - - namespace Hook { #define HOOK_FIELD "hook." @@ -134,35 +120,35 @@ const char *getContentType(const HttpArgs &value){ return "application/x-www-form-urlencoded"; } -static void do_http_hook(const string &url,const ArgsType &body,const function &fun){ - GET_CONFIG(string,mediaServerId,General::kMediaServerId); - const_cast(body)["mediaServerId"] = mediaServerId; +void do_http_hook(const string &url,const ArgsType &body,const function &func){ + GET_CONFIG(string, mediaServerId, General::kMediaServerId); + GET_CONFIG(float, hook_timeoutSec, Hook::kTimeoutSec); - GET_CONFIG(float,hook_timeoutSec,Hook::kTimeoutSec); + const_cast(body)["mediaServerId"] = mediaServerId; HttpRequester::Ptr requester(new HttpRequester); requester->setMethod("POST"); auto bodyStr = to_string(body); requester->setBody(bodyStr); - requester->addHeader("Content-Type",getContentType(body)); + requester->addHeader("Content-Type", getContentType(body)); std::shared_ptr pTicker(new Ticker); - requester->startRequester(url,[url,fun,bodyStr,requester,pTicker](const SockException &ex, - const string &status, - const HttpClient::HttpHeader &header, - const string &strRecvBody){ - onceToken token(nullptr,[&](){ + requester->startRequester(url, [url, func, bodyStr, requester, pTicker](const SockException &ex, + const string &status, + const HttpClient::HttpHeader &header, + const string &strRecvBody) { + onceToken token(nullptr, [&]() { const_cast(requester).reset(); }); parse_http_response(ex,status,header,strRecvBody,[&](const Value &obj,const string &err){ - if(fun){ - fun(obj,err); + if (func) { + func(obj, err); } - if(!err.empty()) { - WarnL << "hook " << url << " " <elapsedTime() << "ms,failed" << err << ":" << bodyStr; - }else if(pTicker->elapsedTime() > 500){ - DebugL << "hook " << url << " " <elapsedTime() << "ms,success:" << bodyStr; + if (!err.empty()) { + WarnL << "hook " << url << " " << pTicker->elapsedTime() << "ms,failed" << err << ":" << bodyStr; + } else if (pTicker->elapsedTime() > 500) { + DebugL << "hook " << url << " " << pTicker->elapsedTime() << "ms,success:" << bodyStr; } }); - },hook_timeoutSec); + }, hook_timeoutSec); } static ArgsType make_json(const MediaInfo &args){ diff --git a/server/WebHook.h b/server/WebHook.h old mode 100644 new mode 100755 index 0d39278e..6cde95b0 --- a/server/WebHook.h +++ b/server/WebHook.h @@ -12,13 +12,32 @@ #define ZLMEDIAKIT_WEBHOOK_H #include +#include +#include "jsoncpp/json.h" using namespace std; +using namespace Json; + +//支持json或urlencoded方式传输参数 +#define JSON_ARGS + +#ifdef JSON_ARGS +typedef Value ArgsType; +#else +typedef HttpArgs ArgsType; +#endif namespace Hook { +//web hook回复最大超时时间 extern const string kTimeoutSec; }//namespace Hook void installWebHook(); void unInstallWebHook(); - +/** + * 触发http hook请求 + * @param url 请求地址 + * @param body 请求body + * @param func 回调 + */ +void do_http_hook(const string &url, const ArgsType &body, const function &func = nullptr); #endif //ZLMEDIAKIT_WEBHOOK_H diff --git a/src/Common/MediaSource.cpp b/src/Common/MediaSource.cpp index afe42ff7..966620f7 100644 --- a/src/Common/MediaSource.cpp +++ b/src/Common/MediaSource.cpp @@ -183,21 +183,21 @@ bool MediaSource::isRecording(Recorder::type type){ return listener->isRecording(*this, type); } -void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ +void MediaSource::startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ auto listener = _listener.lock(); if (!listener) { cb(SockException(Err_other, "尚未设置事件监听器")); return; } - return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, cb); + return listener->startSendRtp(*this, dst_url, dst_port, ssrc, is_udp, src_port, cb); } -bool MediaSource::stopSendRtp() { +bool MediaSource::stopSendRtp(const string &ssrc) { auto listener = _listener.lock(); if (!listener) { return false; } - return listener->stopSendRtp(*this); + return listener->stopSendRtp(*this, ssrc); } void MediaSource::for_each_media(const function &cb) { @@ -642,19 +642,19 @@ vector MediaSourceEventInterceptor::getTracks(MediaSource &sender, b return listener->getTracks(sender, trackReady); } -void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ +void MediaSourceEventInterceptor::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ auto listener = _listener.lock(); if (listener) { - listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); + listener->startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, src_port, cb); } else { - MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, cb); + MediaSourceEvent::startSendRtp(sender, dst_url, dst_port, ssrc, is_udp, src_port, cb); } } -bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender){ +bool MediaSourceEventInterceptor::stopSendRtp(MediaSource &sender, const string &ssrc){ auto listener = _listener.lock(); if (listener) { - return listener->stopSendRtp(sender); + return listener->stopSendRtp(sender, ssrc); } return false; } diff --git a/src/Common/MediaSource.h b/src/Common/MediaSource.h index 965e1e57..0095ee22 100644 --- a/src/Common/MediaSource.h +++ b/src/Common/MediaSource.h @@ -83,9 +83,9 @@ public: // 获取所有track相关信息 virtual vector getTracks(MediaSource &sender, bool trackReady = true) const { return vector(); }; // 开始发送ps-rtp - virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) { cb(SockException(Err_other, "not implemented"));}; + virtual void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) { cb(SockException(Err_other, "not implemented"));}; // 停止发送ps-rtp - virtual bool stopSendRtp(MediaSource &sender) {return false; } + virtual bool stopSendRtp(MediaSource &sender, const string &ssrc) {return false; } private: Timer::Ptr _async_close_timer; @@ -112,8 +112,8 @@ public: bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const string &custom_path) override; bool isRecording(MediaSource &sender, Recorder::type type) override; vector getTracks(MediaSource &sender, bool trackReady = true) const override; - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; - bool stopSendRtp(MediaSource &sender) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; + bool stopSendRtp(MediaSource &sender, const string &ssrc) override; private: std::weak_ptr _listener; @@ -256,9 +256,9 @@ public: // 获取录制状态 bool isRecording(Recorder::type type); // 开始发送ps-rtp - void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb); + void startSendRtp(const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb); // 停止发送ps-rtp - bool stopSendRtp(); + bool stopSendRtp(const string &ssrc); ////////////////static方法,查找或生成MediaSource//////////////// diff --git a/src/Common/MultiMediaSourceMuxer.cpp b/src/Common/MultiMediaSourceMuxer.cpp index 816993bd..f92b42ec 100644 --- a/src/Common/MultiMediaSourceMuxer.cpp +++ b/src/Common/MultiMediaSourceMuxer.cpp @@ -359,11 +359,11 @@ bool MultiMediaSourceMuxer::isRecording(MediaSource &sender, Recorder::type type return _muxer->isRecording(sender,type); } -void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb){ +void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb){ #if defined(ENABLE_RTPPROXY) RtpSender::Ptr rtp_sender = std::make_shared(atoi(ssrc.data())); weak_ptr weak_self = shared_from_this(); - rtp_sender->startSend(dst_url, dst_port, is_udp, [weak_self, rtp_sender, cb](const SockException &ex) { + rtp_sender->startSend(dst_url, dst_port, is_udp, src_port, [weak_self, rtp_sender, cb, ssrc](const SockException &ex) { cb(ex); auto strong_self = weak_self.lock(); if (!strong_self || ex) { @@ -373,24 +373,36 @@ void MultiMediaSourceMuxer::startSendRtp(MediaSource &sender, const string &dst_ rtp_sender->addTrack(track); } rtp_sender->addTrackCompleted(); - strong_self->_rtp_sender = rtp_sender; + lock_guard lck(strong_self->_rtp_sender_mtx); + strong_self->_rtp_sender[ssrc] = rtp_sender; }); #else cb(SockException(Err_other, "该功能未启用,编译时请打开ENABLE_RTPPROXY宏")); #endif//ENABLE_RTPPROXY } -bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender){ +bool MultiMediaSourceMuxer::stopSendRtp(MediaSource &sender, const string& ssrc){ #if defined(ENABLE_RTPPROXY) - if (_rtp_sender) { - _rtp_sender = nullptr; - return true; + if (ssrc.empty()) { + //关闭全部 + lock_guard lck(_rtp_sender_mtx); + auto size = _rtp_sender.size(); + _rtp_sender.clear(); + return size; } -#endif//ENABLE_RTPPROXY + //关闭特定的 + lock_guard lck(_rtp_sender_mtx); + return _rtp_sender.erase(ssrc); +#else return false; +#endif//ENABLE_RTPPROXY } void MultiMediaSourceMuxer::addTrack(const Track::Ptr &track) { + if (CodecL16 == track->getCodecId()) { + WarnL << "L16音频格式目前只支持RTSP协议推流拉流!!!"; + return; + } _muxer->addTrack(track); } @@ -472,9 +484,9 @@ void MultiMediaSourceMuxer::inputFrame(const Frame::Ptr &frame_in) { _muxer->inputFrame(frame); #if defined(ENABLE_RTPPROXY) - auto rtp_sender = _rtp_sender; - if (rtp_sender) { - rtp_sender->inputFrame(frame); + lock_guard lck(_rtp_sender_mtx); + for (auto &pr : _rtp_sender) { + pr.second->inputFrame(frame); } #endif //ENABLE_RTPPROXY @@ -486,7 +498,7 @@ bool MultiMediaSourceMuxer::isEnabled(){ //无人观看时,每次检查是否真的无人观看 //有人观看时,则延迟一定时间检查一遍是否无人观看了(节省性能) #if defined(ENABLE_RTPPROXY) - _is_enable = (_muxer->isEnabled() || _rtp_sender); + _is_enable = (_muxer->isEnabled() || _rtp_sender.size()); #else _is_enable = _muxer->isEnabled(); #endif //ENABLE_RTPPROXY diff --git a/src/Common/MultiMediaSourceMuxer.h b/src/Common/MultiMediaSourceMuxer.h index c257d3d2..d1bdc3bf 100644 --- a/src/Common/MultiMediaSourceMuxer.h +++ b/src/Common/MultiMediaSourceMuxer.h @@ -143,13 +143,13 @@ public: * @param is_udp 是否为udp * @param cb 启动成功或失败回调 */ - void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, const function &cb) override; + void startSendRtp(MediaSource &sender, const string &dst_url, uint16_t dst_port, const string &ssrc, bool is_udp, uint16_t src_port, const function &cb) override; /** * 停止ps-rtp发送 * @return 是否成功 */ - bool stopSendRtp(MediaSource &sender) override; + bool stopSendRtp(MediaSource &sender, const string &ssrc) override; /////////////////////////////////MediaSinkInterface override///////////////////////////////// @@ -190,7 +190,8 @@ private: MultiMuxerPrivate::Ptr _muxer; std::weak_ptr _track_listener; #if defined(ENABLE_RTPPROXY) - RtpSender::Ptr _rtp_sender; + mutex _rtp_sender_mtx; + unordered_map _rtp_sender; #endif //ENABLE_RTPPROXY }; diff --git a/src/Common/Parser.cpp b/src/Common/Parser.cpp index 1c866a64..abb749dd 100644 --- a/src/Common/Parser.cpp +++ b/src/Common/Parser.cpp @@ -137,9 +137,13 @@ StrCaseMap Parser::parseArgs(const string &str, const char *pair_delim, const ch StrCaseMap ret; auto arg_vec = split(str, pair_delim); for (string &key_val : arg_vec) { - auto key = FindField(key_val.data(), NULL, key_delim); - auto val = FindField(key_val.data(), key_delim, NULL); - ret.emplace_force(trim(key), trim(val)); + auto key = trim(FindField(key_val.data(), NULL, key_delim)); + if (!key.empty()) { + auto val = trim(FindField(key_val.data(), key_delim, NULL)); + ret.emplace_force(key, val); + } else { + ret.emplace_force(key_val, ""); + } } return ret; } diff --git a/src/Extension/Factory.cpp b/src/Extension/Factory.cpp index fba7109b..fa2248de 100644 --- a/src/Extension/Factory.cpp +++ b/src/Extension/Factory.cpp @@ -20,6 +20,7 @@ #include "CommonRtp.h" #include "Opus.h" #include "G711.h" +#include "L16.h" #include "Common/Parser.h" namespace mediakit{ @@ -56,6 +57,10 @@ Track::Ptr Factory::getTrackBySdp(const SdpTrack::Ptr &track) { return std::make_shared(CodecG711U, track->_samplerate, track->_channel, 16); } + if (strcasecmp(track->_codec.data(), "L16") == 0) { + return std::make_shared(track->_samplerate, track->_channel); + } + if (strcasecmp(track->_codec.data(), "h264") == 0) { //a=fmtp:96 packetization-mode=1;profile-level-id=42C01F;sprop-parameter-sets=Z0LAH9oBQBboQAAAAwBAAAAPI8YMqA==,aM48gA== auto map = Parser::parseArgs(FindField(track->_fmtp.data()," ", nullptr),";","="); @@ -123,6 +128,7 @@ RtpCodec::Ptr Factory::getRtpEncoderBySdp(const Sdp::Ptr &sdp) { case CodecH264 : return std::make_shared(ssrc, mtu, sample_rate, pt, interleaved); case CodecH265 : return std::make_shared(ssrc, mtu, sample_rate, pt, interleaved); case CodecAAC : return std::make_shared(ssrc, mtu, sample_rate, pt, interleaved); + case CodecL16 : case CodecOpus : case CodecG711A : case CodecG711U : return std::make_shared(codec_id, ssrc, mtu, sample_rate, pt, interleaved); @@ -135,6 +141,7 @@ RtpCodec::Ptr Factory::getRtpDecoderByTrack(const Track::Ptr &track) { case CodecH264 : return std::make_shared(); case CodecH265 : return std::make_shared(); case CodecAAC : return std::make_shared(track->clone()); + case CodecL16 : case CodecOpus : case CodecG711A : case CodecG711U : return std::make_shared(track->getCodecId()); diff --git a/src/Extension/Frame.cpp b/src/Extension/Frame.cpp index ec8a3608..45c2db38 100644 --- a/src/Extension/Frame.cpp +++ b/src/Extension/Frame.cpp @@ -80,6 +80,7 @@ const char *getCodecName(CodecId codecId) { SWITCH_CASE(CodecG711A); SWITCH_CASE(CodecG711U); SWITCH_CASE(CodecOpus); + SWITCH_CASE(CodecL16); default : return "unknown codec"; } } @@ -91,7 +92,8 @@ TrackType getTrackType(CodecId codecId){ case CodecAAC: case CodecG711A: case CodecG711U: - case CodecOpus: return TrackAudio; + case CodecOpus: + case CodecL16: return TrackAudio; default: return TrackInvalid; } } diff --git a/src/Extension/Frame.h b/src/Extension/Frame.h index a967392c..039e3395 100644 --- a/src/Extension/Frame.h +++ b/src/Extension/Frame.h @@ -29,6 +29,7 @@ typedef enum { CodecG711A, CodecG711U, CodecOpus, + CodecL16, CodecMax = 0x7FFF } CodecId; diff --git a/src/Extension/L16.cpp b/src/Extension/L16.cpp new file mode 100644 index 00000000..8cc0c977 --- /dev/null +++ b/src/Extension/L16.cpp @@ -0,0 +1,26 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#include "L16.h" + +namespace mediakit{ + +Sdp::Ptr L16Track::getSdp() { + WarnL << "Enter L16Track::getSdp function"; + if(!ready()){ + WarnL << getCodecName() << " Track未准备好"; + return nullptr; + } + return std::make_shared(getCodecId(), getAudioSampleRate(), getAudioChannel(), getBitRate() / 1024); +} + +}//namespace mediakit + + diff --git a/src/Extension/L16.h b/src/Extension/L16.h new file mode 100644 index 00000000..38b3f131 --- /dev/null +++ b/src/Extension/L16.h @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved. + * + * This file is part of ZLMediaKit(https://github.com/xiongziliang/ZLMediaKit). + * + * Use of this source code is governed by MIT license that can be found in the + * LICENSE file in the root of the source tree. All contributing project authors + * may be found in the AUTHORS file in the root of the source tree. + */ + +#ifndef ZLMEDIAKIT_L16_H +#define ZLMEDIAKIT_L16_H + +#include "Frame.h" +#include "Track.h" + +namespace mediakit{ + +/** + * L16音频通道 + */ +class L16Track : public AudioTrackImp{ +public: + typedef std::shared_ptr Ptr; + L16Track(int sample_rate, int channels) : AudioTrackImp(CodecL16,sample_rate,channels,16){} + +private: + //克隆该Track + Track::Ptr clone() override { + return std::make_shared::type >(*this); + } + //生成sdp + Sdp::Ptr getSdp() override ; +}; + +/** + * L16类型SDP + */ +class L16Sdp : public Sdp { +public: + /** + * L16采样位数固定为16位 + * @param codecId CodecL16 + * @param sample_rate 音频采样率 + * @param payload_type rtp payload + * @param bitrate 比特率 + */ + L16Sdp(CodecId codecId, + int sample_rate, + int channels, + int bitrate = 128, + int payload_type = 98) : Sdp(sample_rate,payload_type), _codecId(codecId){ + _printer << "m=audio 0 RTP/AVP " << payload_type << "\r\n"; + if (bitrate) { + _printer << "b=AS:" << bitrate << "\r\n"; + } + _printer << "a=rtpmap:" << payload_type << " L16/" << sample_rate << "/" << channels << "\r\n"; + _printer << "a=control:trackID=" << (int)TrackAudio << "\r\n"; + } + + string getSdp() const override { + return _printer; + } + + CodecId getCodecId() const override { + return _codecId; + } +private: + _StrPrinter _printer; + CodecId _codecId; +}; + +}//namespace mediakit +#endif //ZLMEDIAKIT_L16_H \ No newline at end of file diff --git a/src/Rtp/RtpProcess.cpp b/src/Rtp/RtpProcess.cpp index 253ed9a5..9555c3cc 100644 --- a/src/Rtp/RtpProcess.cpp +++ b/src/Rtp/RtpProcess.cpp @@ -28,6 +28,7 @@ RtpProcess::RtpProcess(const string &stream_id) { _media_info._vhost = DEFAULT_VHOST; _media_info._app = RTP_APP_NAME; _media_info._streamid = stream_id; + _stop_rtp_check.store(false); GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir); { @@ -143,6 +144,10 @@ bool RtpProcess::alive() { } } + if (_stop_rtp_check.load()) { + return true; + } + GET_CONFIG(int,timeoutSec,RtpProxy::kTimeoutSec) if(_last_frame_time.elapsedTime() / 1000 < timeoutSec){ return true; @@ -150,6 +155,10 @@ bool RtpProcess::alive() { return false; } +void RtpProcess::setStopCheckRtp(bool is_check){ + _stop_rtp_check = is_check; +} + void RtpProcess::onDetach() { if (_on_detach) { _on_detach(); diff --git a/src/Rtp/RtpProcess.h b/src/Rtp/RtpProcess.h index c3871488..c32a8080 100644 --- a/src/Rtp/RtpProcess.h +++ b/src/Rtp/RtpProcess.h @@ -53,6 +53,11 @@ public: */ void setOnDetach(const function &cb); + /** + * 设置onDetach事件回调,false检查RTP超时,true停止 + */ + void setStopCheckRtp(bool is_check=false); + /// SockInfo override string get_local_ip() override; uint16_t get_local_port() override; @@ -94,7 +99,7 @@ private: std::shared_ptr _save_file_video; ProcessInterface::Ptr _process; MultiMediaSourceMuxer::Ptr _muxer; - + std::atomic_bool _stop_rtp_check; bool _paused = false; Ticker _pause_rtp_time; }; diff --git a/src/Rtp/RtpSender.cpp b/src/Rtp/RtpSender.cpp index 8fdb531a..50710a1e 100644 --- a/src/Rtp/RtpSender.cpp +++ b/src/Rtp/RtpSender.cpp @@ -26,14 +26,15 @@ RtpSender::RtpSender(uint32_t ssrc, uint8_t payload_type) { RtpSender::~RtpSender() { } -void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb){ +void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb){ _is_udp = is_udp; _socket = Socket::createSocket(_poller, false); _dst_url = dst_url; _dst_port = dst_port; + _src_port = src_port; weak_ptr weak_self = shared_from_this(); if (is_udp) { - _socket->bindUdpSock(0); + _socket->bindUdpSock(src_port); auto poller = _poller; WorkThreadPool::Instance().getPoller()->async([cb, dst_url, dst_port, weak_self, poller]() { struct sockaddr addr; @@ -65,7 +66,7 @@ void RtpSender::startSend(const string &dst_url, uint16_t dst_port, bool is_udp, //tcp连接成功 strong_self->onConnect(); } - }); + }, 5.0F, "0.0.0.0", src_port); } } @@ -149,7 +150,7 @@ void RtpSender::onErr(const SockException &ex, bool is_connect) { if (!strong_self) { return false; } - strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, [weak_self](const SockException &ex){ + strong_self->startSend(strong_self->_dst_url, strong_self->_dst_port, strong_self->_is_udp, strong_self->_src_port, [weak_self](const SockException &ex){ auto strong_self = weak_self.lock(); if (strong_self && ex) { //连接失败且本对象未销毁,那么重试连接 diff --git a/src/Rtp/RtpSender.h b/src/Rtp/RtpSender.h index 86a58894..840792f7 100644 --- a/src/Rtp/RtpSender.h +++ b/src/Rtp/RtpSender.h @@ -37,7 +37,7 @@ public: * @param is_udp 是否采用udp方式发送rtp * @param cb 连接目标端口是否成功的回调 */ - void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, const function &cb); + void startSend(const string &dst_url, uint16_t dst_port, bool is_udp, uint16_t src_port, const function &cb); /** * 输入帧数据 @@ -74,6 +74,7 @@ private: bool _is_connect = false; string _dst_url; uint16_t _dst_port; + uint16_t _src_port; Socket::Ptr _socket; EventPoller::Ptr _poller; Timer::Ptr _connect_timer; diff --git a/src/Rtp/RtpServer.cpp b/src/Rtp/RtpServer.cpp index d192416d..cb4f79d9 100644 --- a/src/Rtp/RtpServer.cpp +++ b/src/Rtp/RtpServer.cpp @@ -90,5 +90,16 @@ uint16_t RtpServer::getPort() { return _udp_server ? _udp_server->get_local_port() : 0; } +void RtpServer::pauseRtpCheck(){ + if(_rtp_process) + _rtp_process->setStopCheckRtp(true); +} + +void RtpServer::resumeRtpCheck(){ + if(_rtp_process) + _rtp_process->setStopCheckRtp(false); + +} + }//namespace mediakit #endif//defined(ENABLE_RTPPROXY) \ No newline at end of file diff --git a/src/Rtp/RtpServer.h b/src/Rtp/RtpServer.h index bcc045bc..cc3d5aaa 100644 --- a/src/Rtp/RtpServer.h +++ b/src/Rtp/RtpServer.h @@ -57,6 +57,16 @@ public: */ void setOnDetach(const function &cb); + /** + * 暂停Rtp服务的RTP流检测 + */ + void pauseRtpCheck(); + + /** + * 恢复Rtp服务的RTP流检测 + */ + void resumeRtpCheck(); + protected: Socket::Ptr _udp_server; TcpServer::Ptr _tcp_server; diff --git a/src/Rtsp/RtpReceiver.cpp b/src/Rtsp/RtpReceiver.cpp index 6e3826f9..977a3d4a 100644 --- a/src/Rtsp/RtpReceiver.cpp +++ b/src/Rtsp/RtpReceiver.cpp @@ -152,13 +152,16 @@ void RtpReceiver::setPoolSize(int size) { _rtp_pool.setSize(size); } -int RtpReceiver::getJitterSize(int track_index){ +int RtpReceiver::getJitterSize(int track_index) const{ return _rtp_sortor[track_index].getJitterSize(); } -int RtpReceiver::getCycleCount(int track_index){ +int RtpReceiver::getCycleCount(int track_index) const{ return _rtp_sortor[track_index].getCycleCount(); } +uint32_t RtpReceiver::getSSRC(int track_index) const{ + return _ssrc[track_index]; +} }//namespace mediakit diff --git a/src/Rtsp/RtpReceiver.h b/src/Rtsp/RtpReceiver.h index 14d6d991..f9a972f0 100644 --- a/src/Rtsp/RtpReceiver.h +++ b/src/Rtsp/RtpReceiver.h @@ -44,14 +44,14 @@ public: /** * 获取排序缓存长度 */ - int getJitterSize() { + int getJitterSize() const{ return _rtp_sort_cache_map.size(); } /** * 获取seq回环次数 */ - int getCycleCount() { + int getCycleCount() const{ return _seq_cycle_count; } @@ -184,8 +184,9 @@ protected: void clear(); void setPoolSize(int size); - int getJitterSize(int track_index); - int getCycleCount(int track_index); + int getJitterSize(int track_index) const; + int getCycleCount(int track_index) const; + uint32_t getSSRC(int track_index) const; private: uint32_t _ssrc[2] = {0, 0}; diff --git a/src/Rtsp/RtspPlayer.cpp b/src/Rtsp/RtspPlayer.cpp index 3ebde80f..7a47375f 100644 --- a/src/Rtsp/RtspPlayer.cpp +++ b/src/Rtsp/RtspPlayer.cpp @@ -252,40 +252,43 @@ void RtspPlayer::sendSetup(unsigned int track_idx) { void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) { if (parser.Url() != "200") { - throw std::runtime_error( - StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl); + throw std::runtime_error(StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl); } if (track_idx == 0) { _session_id = parser["Session"]; - _session_id.append(";"); - _session_id = FindField(_session_id.data(), nullptr, ";"); } auto strTransport = parser["Transport"]; - if(strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos){ + if (strTransport.find("TCP") != string::npos || strTransport.find("interleaved") != string::npos) { _rtp_type = Rtsp::RTP_TCP; - }else if(strTransport.find("multicast") != string::npos){ + } else if (strTransport.find("multicast") != string::npos) { _rtp_type = Rtsp::RTP_MULTICAST; - }else{ + } else { _rtp_type = Rtsp::RTP_UDP; } - + auto transport_map = Parser::parseArgs(strTransport, ";", "="); RtspSplitter::enableRecvRtp(_rtp_type == Rtsp::RTP_TCP); + string ssrc = transport_map["ssrc"]; + if(!ssrc.empty()){ + sscanf(ssrc.data(), "%x", &_sdp_track[track_idx]->_ssrc); + } else{ + _sdp_track[track_idx]->_ssrc = 0; + } - if(_rtp_type == Rtsp::RTP_TCP) { - string interleaved = FindField( FindField((strTransport + ";").data(), "interleaved=", ";").data(), NULL, "-"); - _sdp_track[track_idx]->_interleaved = atoi(interleaved.data()); - }else{ - const char *strPos = (_rtp_type == Rtsp::RTP_MULTICAST ? "port=" : "server_port=") ; - auto port_str = FindField((strTransport + ";").data(), strPos, ";"); - uint16_t rtp_port = atoi(FindField(port_str.data(), NULL, "-").data()); - uint16_t rtcp_port = atoi(FindField(port_str.data(), "-",NULL).data()); + if (_rtp_type == Rtsp::RTP_TCP) { + int interleaved_rtp, interleaved_rtcp; + sscanf(transport_map["interleaved"].data(), "%d-%d", &interleaved_rtp, &interleaved_rtcp); + _sdp_track[track_idx]->_interleaved = interleaved_rtp; + } else { + auto port_str = transport_map[(_rtp_type == Rtsp::RTP_MULTICAST ? "port" : "server_port")]; + int rtp_port, rtcp_port; + sscanf(port_str.data(), "%d-%d", &rtp_port, &rtcp_port); auto &pRtpSockRef = _rtp_sock[track_idx]; auto &pRtcpSockRef = _rtcp_sock[track_idx]; if (_rtp_type == Rtsp::RTP_MULTICAST) { //udp组播 - auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";"); + auto multiAddr = transport_map["destination"]; pRtpSockRef = createSocket(); if (!pRtpSockRef->bindUdpSock(rtp_port, multiAddr.data())) { pRtpSockRef.reset(); @@ -383,7 +386,7 @@ void RtspPlayer::sendKeepAlive(){ _on_response = [this](const Parser& parser){}; if(_supported_cmd.find("GET_PARAMETER") != _supported_cmd.end()){ //支持GET_PARAMETER,用此命令保活 - sendRtspRequest("GET_PARAMETER", _play_url); + sendRtspRequest("GET_PARAMETER", _content_base); }else{ //不支持GET_PARAMETER,用OPTIONS命令保活 sendRtspRequest("OPTIONS", _play_url); @@ -549,7 +552,7 @@ void RtspPlayer::sendReceiverReport(bool over_tcp, int track_idx){ aui8Rtcp[0] = '$'; aui8Rtcp[1] = track->_interleaved + 1; - aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8; + aui8Rtcp[2] = (sizeof(aui8Rtcp) - 4) >> 8; aui8Rtcp[3] = (sizeof(aui8Rtcp) - 4) & 0xFF; pui8Rtcp_RR[0] = 0x81;/* 1 report block */ @@ -557,11 +560,13 @@ void RtspPlayer::sendReceiverReport(bool over_tcp, int track_idx){ pui8Rtcp_RR[2] = 0x00; pui8Rtcp_RR[3] = 0x07;/* length in words - 1 */ - uint32_t ssrc=htonl(track->_ssrc + 1); + auto track_ssrc = track->_ssrc ? track->_ssrc : getSSRC(track_idx); // our own SSRC: we use the server's SSRC + 1 to avoid conflicts + uint32_t ssrc = htonl(track_ssrc + 1); memcpy(&pui8Rtcp_RR[4], &ssrc, 4); - ssrc=htonl(track->_ssrc); + // server SSRC + ssrc = htonl(track_ssrc); memcpy(&pui8Rtcp_RR[8], &ssrc, 4); //FIXME: 8 bits of fraction, 24 bits of total packets lost @@ -577,9 +582,9 @@ void RtspPlayer::sendReceiverReport(bool over_tcp, int track_idx){ pui8Rtcp_RR[18] = counter.pktCnt >> 8; pui8Rtcp_RR[19] = counter.pktCnt & 0xFF; - uint32_t jitter = htonl(getJitterSize(track_idx)); + uint32_t jitter = htonl(getJitterSize(track_idx)); //FIXME: jitter - memcpy(pui8Rtcp_RR + 20, &jitter , 4); + memcpy(pui8Rtcp_RR + 20, &jitter, 4); /* last SR timestamp */ memcpy(pui8Rtcp_RR + 24, &counter.lastTimeStamp, 4); uint32_t msInc = htonl(ntohl(counter.timeStamp) - ntohl(counter.lastTimeStamp)); diff --git a/src/Rtsp/RtspSession.cpp b/src/Rtsp/RtspSession.cpp index 40c42f47..d016fa90 100644 --- a/src/Rtsp/RtspSession.cpp +++ b/src/Rtsp/RtspSession.cpp @@ -106,7 +106,7 @@ void RtspSession::onManager() { } } - if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000) { + if ((_rtp_type == Rtsp::RTP_UDP || _push_src ) && _alive_ticker.elapsedTime() > keep_alive_sec * 1000 && _enable_send_rtp) { //如果是推流端或者rtp over udp类型的播放端,那么就做超时检测 shutdown(SockException(Err_timeout,"rtp over udp session timeouted")); return;