This commit is contained in:
xgj
2021-06-24 11:53:15 +08:00
29 changed files with 800 additions and 263 deletions

View File

@@ -260,6 +260,8 @@ const string kFileBufSize = HLS_FIELD"fileBufSize";
const string kFilePath = HLS_FIELD"filePath";
// 是否广播 ts 切片完成通知
const string kBroadcastRecordTs = HLS_FIELD"broadcastRecordTs";
//hls直播文件删除延时单位秒
const string kDeleteDelaySec = HLS_FIELD"deleteDelaySec";
onceToken token([](){
mINI::Instance()[kSegmentDuration] = 2;
@@ -268,6 +270,7 @@ onceToken token([](){
mINI::Instance()[kFileBufSize] = 64 * 1024;
mINI::Instance()[kFilePath] = "./www";
mINI::Instance()[kBroadcastRecordTs] = false;
mINI::Instance()[kDeleteDelaySec] = 0;
},nullptr);
} //namespace Hls

View File

@@ -293,6 +293,8 @@ extern const string kFileBufSize;
extern const string kFilePath;
// 是否广播 ts 切片完成通知
extern const string kBroadcastRecordTs;
//hls直播文件删除延时单位秒
extern const string kDeleteDelaySec;
} //namespace Hls
////////////Rtp代理相关配置///////////

View File

@@ -0,0 +1,99 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#include "PusherProxy.h"
using namespace toolkit;
namespace mediakit {
PusherProxy::PusherProxy(const MediaSource::Ptr &src, int retry_count, const EventPoller::Ptr &poller)
: MediaPusher(src, poller){
_retry_count = retry_count;
_on_close = [](const SockException &) {};
_weak_src = src;
}
PusherProxy::~PusherProxy() {
_timer.reset();
}
void PusherProxy::setPushCallbackOnce(const function<void(const SockException &ex)> &cb) {
_on_publish = cb;
}
void PusherProxy::setOnClose(const function<void(const SockException &ex)> &cb) {
_on_close = cb;
}
void PusherProxy::publish(const string &dst_url) {
std::weak_ptr<PusherProxy> weak_self = shared_from_this();
std::shared_ptr<int> failed_cnt(new int(0));
setOnPublished([weak_self, dst_url, failed_cnt](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
if (strong_self->_on_publish) {
strong_self->_on_publish(err);
strong_self->_on_publish = nullptr;
}
auto src = strong_self->_weak_src.lock();
if (!err) {
// 推流成功
*failed_cnt = 0;
InfoL << "Publish " << dst_url << " success";
} else if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) {
// 推流失败,延时重试推送
strong_self->rePublish(dst_url, (*failed_cnt)++);
} else {
//如果媒体源已经注销, 或达到了最大重试次数,回调关闭
strong_self->_on_close(err);
}
});
setOnShutdown([weak_self, dst_url, failed_cnt](const SockException &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
auto src = strong_self->_weak_src.lock();
//推流异常中断,延时重试播放
if (src && (*failed_cnt < strong_self->_retry_count || strong_self->_retry_count < 0)) {
strong_self->rePublish(dst_url, (*failed_cnt)++);
} else {
//如果媒体源已经注销, 或达到了最大重试次数,回调关闭
strong_self->_on_close(err);
}
});
MediaPusher::publish(dst_url);
}
void PusherProxy::rePublish(const string &dst_url, int failed_cnt) {
auto delay = MAX(2 * 1000, MIN(failed_cnt * 3000, 60 * 1000));
weak_ptr<PusherProxy> weak_self = shared_from_this();
_timer = std::make_shared<Timer>(delay / 1000.0f, [weak_self, dst_url, failed_cnt]() {
//推流失败次数越多,则延时越长
auto strong_self = weak_self.lock();
if (!strong_self) {
return false;
}
WarnL << "推流重试[" << failed_cnt << "]:" << dst_url;
strong_self->MediaPusher::publish(dst_url);
return false;
}, getPoller());
}
} /* namespace mediakit */

63
src/Pusher/PusherProxy.h Normal file
View File

@@ -0,0 +1,63 @@
/*
* Copyright (c) 2016 The ZLMediaKit project authors. All Rights Reserved.
*
* This file is part of ZLMediaKit(https://github.com/xia-chu/ZLMediaKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef SRC_DEVICE_PUSHERPROXY_H
#define SRC_DEVICE_PUSHERPROXY_H
#include "Pusher/MediaPusher.h"
#include "Util/TimeTicker.h"
using namespace std;
using namespace toolkit;
namespace mediakit {
class PusherProxy : public MediaPusher, public std::enable_shared_from_this<PusherProxy> {
public:
typedef std::shared_ptr<PusherProxy> Ptr;
// 如果retry_count<0,则一直重试播放否则重试retry_count次数
// 默认一直重试创建此对象时候需要外部保证MediaSource存在
PusherProxy(const MediaSource::Ptr &src, int retry_count = -1, const EventPoller::Ptr &poller = nullptr);
~PusherProxy() override;
/**
* 设置push结果回调只触发一次在publish执行之前有效
* @param cb 回调对象
*/
void setPushCallbackOnce(const function<void(const SockException &ex)> &cb);
/**
* 设置主动关闭回调
* @param cb 回调对象
*/
void setOnClose(const function<void(const SockException &ex)> &cb);
/**
* 开始拉流播放
* @param dstUrl 目标推流地址
*/
void publish(const string& dstUrl) override;
private:
// 重推逻辑函数
void rePublish(const string &dstUrl, int iFailedCnt);
private:
int _retry_count;
Timer::Ptr _timer;
std::weak_ptr<MediaSource> _weak_src;
function<void(const SockException &ex)> _on_close;
function<void(const SockException &ex)> _on_publish;
};
} /* namespace mediakit */
#endif //SRC_DEVICE_PUSHERPROXY_H

View File

@@ -23,6 +23,7 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file,
uint32_t bufSize,
float seg_duration,
uint32_t seg_number) : HlsMaker(seg_duration, seg_number) {
_poller = EventPollerPool::Instance().getPoller();
_path_prefix = m3u8_file.substr(0, m3u8_file.rfind('/'));
_path_hls = m3u8_file;
_params = params;
@@ -35,18 +36,30 @@ HlsMakerImp::HlsMakerImp(const string &m3u8_file,
}
HlsMakerImp::~HlsMakerImp() {
clearCache();
clearCache(false);
}
void HlsMakerImp::clearCache() {
void HlsMakerImp::clearCache(bool immediately) {
//录制完了
flushLastSegment(true);
if (isLive()) {
//hls直播才删除文件
clear();
_file = nullptr;
_segment_file_paths.clear();
if (!isLive()) {
return;
}
clear();
_file = nullptr;
_segment_file_paths.clear();
//hls直播才删除文件
GET_CONFIG(uint32_t, delay, Hls::kDeleteDelaySec);
if (!delay || immediately) {
File::delete_file(_path_prefix.data());
} else {
auto path_prefix = _path_prefix;
_poller->doDelayTask(delay * 1000, [path_prefix]() {
File::delete_file(path_prefix.data());
return 0;
});
}
}

View File

@@ -47,8 +47,9 @@ public:
/**
* 清空缓存
* @param immediately 时候立即删除
*/
void clearCache();
void clearCache(bool immediately = true);
protected:
string onOpenSegment(uint64_t index) override ;
@@ -69,6 +70,7 @@ private:
std::shared_ptr<FILE> _file;
std::shared_ptr<char> _file_buf;
HlsMediaSource::Ptr _media_src;
EventPoller::Ptr _poller;
map<uint64_t/*index*/,string/*file_path*/> _segment_file_paths;
};

View File

@@ -115,7 +115,14 @@ public:
class RtmpHeader {
public:
uint8_t flags;
#if __BYTE_ORDER == __BIG_ENDIAN
uint8_t fmt : 2;
uint8_t chunk_id : 6;
#else
uint8_t chunk_id : 6;
//0、1、2、3分别对应 12、8、4、1长度
uint8_t fmt : 2;
#endif
uint8_t time_stamp[3];
uint8_t body_size[3];
uint8_t type_id;

View File

@@ -210,7 +210,8 @@ void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::P
buffer_header->setSize(sizeof(RtmpHeader));
//对rtmp头赋值如果使用整形赋值在arm android上可能由于数据对齐导致总线错误的问题
RtmpHeader *header = (RtmpHeader *) buffer_header->data();
header->flags = (chunk_id & 0x3f) | (0 << 6);
header->fmt = 0;
header->chunk_id = chunk_id;
header->type_id = type;
set_be24(header->time_stamp, ext_stamp ? 0xFFFFFF : stamp);
set_be24(header->body_size, (uint32_t)buf->size());
@@ -232,7 +233,9 @@ void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::P
BufferRaw::Ptr buffer_flags = obtainBuffer();
buffer_flags->setCapacity(1);
buffer_flags->setSize(1);
buffer_flags->data()[0] = (chunk_id & 0x3f) | (3 << 6);
header = (RtmpHeader *) buffer_flags->data();
header->fmt = 3;
header->chunk_id = chunk_id;
size_t offset = 0;
size_t totalSize = sizeof(RtmpHeader);
@@ -523,15 +526,15 @@ const char* RtmpProtocol::handle_C2(const char *data, size_t len) {
return handle_rtmp(data + C1_HANDSHARK_SIZE, len - C1_HANDSHARK_SIZE);
}
static const size_t HEADER_LENGTH[] = {12, 8, 4, 1};
static constexpr size_t HEADER_LENGTH[] = {12, 8, 4, 1};
const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) {
auto ptr = data;
while (len) {
int offset = 0;
uint8_t flags = ptr[0];
size_t header_len = HEADER_LENGTH[flags >> 6];
_now_chunk_id = flags & 0x3f;
size_t offset = 0;
auto header = (RtmpHeader *) ptr;
auto header_len = HEADER_LENGTH[header->fmt];
_now_chunk_id = header->chunk_id;
switch (_now_chunk_id) {
case 0: {
//0 值表示二字节形式,并且 ID 范围 64 - 319
@@ -565,7 +568,7 @@ const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) {
//need more data
return ptr;
}
RtmpHeader &header = *((RtmpHeader *) (ptr + offset));
header = (RtmpHeader *) (ptr + offset);
auto &pr = _map_chunk_data[_now_chunk_id];
auto &now_packet = pr.first;
auto &last_packet = pr.second;
@@ -583,12 +586,12 @@ const char* RtmpProtocol::handle_rtmp(const char *data, size_t len) {
switch (header_len) {
case 12:
chunk_data.is_abs_stamp = true;
chunk_data.stream_index = load_le32(header.stream_index);
chunk_data.stream_index = load_le32(header->stream_index);
case 8:
chunk_data.body_size = load_be24(header.body_size);
chunk_data.type_id = header.type_id;
chunk_data.body_size = load_be24(header->body_size);
chunk_data.type_id = header->type_id;
case 4:
chunk_data.ts_field = load_be24(header.time_stamp);
chunk_data.ts_field = load_be24(header->time_stamp);
}
auto time_stamp = chunk_data.ts_field;

View File

@@ -47,6 +47,7 @@ void RtmpPusher::teardown() {
}
void RtmpPusher::onPublishResult(const SockException &ex, bool handshake_done) {
DebugL << ex.what();
if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调
return;

View File

@@ -103,19 +103,16 @@ static const char *getCodecName(int codec_id) {
void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t bytes, int finish){
switch (codecid) {
case PSI_STREAM_H264: {
InfoL << "got video track: H264";
onTrack(std::make_shared<H264Track>());
break;
}
case PSI_STREAM_H265: {
InfoL << "got video track: H265";
onTrack(std::make_shared<H265Track>());
break;
}
case PSI_STREAM_AAC: {
InfoL<< "got audio track: AAC";
onTrack(std::make_shared<AACTrack>());
break;
}
@@ -123,14 +120,12 @@ void DecoderImp::onStream(int stream, int codecid, const void *extra, size_t byt
case PSI_STREAM_AUDIO_G711A:
case PSI_STREAM_AUDIO_G711U: {
auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U;
InfoL << "got audio track: G711";
//G711传统只支持 8000/1/16的规格FFmpeg貌似做了扩展但是这里不管它了
onTrack(std::make_shared<G711Track>(codec, 8000, 1, 16));
break;
}
case PSI_STREAM_AUDIO_OPUS: {
InfoL << "got audio track: opus";
onTrack(std::make_shared<OpusTrack>());
break;
}
@@ -223,8 +218,11 @@ void DecoderImp::onStream(int stream,int codecid,const void *extra,size_t bytes,
#endif
void DecoderImp::onTrack(const Track::Ptr &track) {
_tracks[track->getTrackType()] = track;
_sink->addTrack(track);
if (!_tracks[track->getTrackType()]) {
_tracks[track->getTrackType()] = track;
_sink->addTrack(track);
InfoL << "got track: " << track->getCodecName();
}
}
void DecoderImp::onFrame(const Frame::Ptr &frame) {

View File

@@ -14,6 +14,8 @@
#include "Http/HttpTSPlayer.h"
#include "Extension/CommonRtp.h"
#include "Extension/H264Rtp.h"
#include "Extension/Factory.h"
#include "Extension/Opus.h"
namespace mediakit{
@@ -22,6 +24,40 @@ static inline bool checkTS(const uint8_t *packet, size_t bytes){
return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE;
}
class RtpReceiverImp : public RtpReceiver {
public:
using Ptr = std::shared_ptr<RtpReceiverImp>;
RtpReceiverImp(int sample_rate, function<void(RtpPacket::Ptr rtp)> cb, function<void(const RtpPacket::Ptr &rtp)> cb_before = nullptr){
_sample_rate = sample_rate;
_on_sort = std::move(cb);
_on_before_sort = std::move(cb_before);
}
~RtpReceiverImp() override = default;
bool inputRtp(TrackType type, uint8_t *ptr, size_t len){
return handleOneRtp((int) type, type, _sample_rate, ptr, len);
}
protected:
void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override {
_on_sort(std::move(rtp));
}
void onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index) override {
if (_on_before_sort) {
_on_before_sort(rtp);
}
}
private:
int _sample_rate;
function<void(RtpPacket::Ptr rtp)> _on_sort;
function<void(const RtpPacket::Ptr &rtp)> _on_before_sort;
};
///////////////////////////////////////////////////////////////////////////////////////////
GB28181Process::GB28181Process(const MediaInfo &media_info, MediaSinkInterface *interface) {
assert(interface);
_media_info = media_info;
@@ -30,26 +66,80 @@ GB28181Process::GB28181Process(const MediaInfo &media_info, MediaSinkInterface *
GB28181Process::~GB28181Process() {}
bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
return handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len);
void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp) {
_rtp_decoder[rtp->getHeader()->pt]->inputRtp(rtp, false);
}
void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp, int) {
auto pt = rtp->getHeader()->pt;
if (!_rtp_decoder) {
bool GB28181Process::inputRtp(bool, const char *data, size_t data_len) {
RtpHeader *header = (RtpHeader *) data;
auto pt = header->pt;
auto &ref = _rtp_receiver[pt];
if (!ref) {
if (_rtp_receiver.size() > 2) {
//防止pt类型太多导致内存溢出
throw std::invalid_argument("rtp pt类型不得超过2种!");
}
switch (pt) {
case 98: {
//H264负载
_rtp_decoder = std::make_shared<H264RtpDecoder>();
_interface->addTrack(std::make_shared<H264Track>());
case 100: {
//opus负载
ref = std::make_shared<RtpReceiverImp>(48000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<OpusTrack>();
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
}
case 99: {
//H265负载
ref = std::make_shared<RtpReceiverImp>(90000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<H265Track>();
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
}
case 98: {
//H264负载
ref = std::make_shared<RtpReceiverImp>(90000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<H264Track>();
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
}
case 0:
//CodecG711U
case 8: {
//CodecG711A
ref = std::make_shared<RtpReceiverImp>(8000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
auto track = std::make_shared<G711Track>(pt == 0 ? CodecG711U : CodecG711A, 8000, 1, 16);
_interface->addTrack(track);
_rtp_decoder[pt] = Factory::getRtpDecoderByTrack(track);
break;
}
default: {
if (pt != 33 && pt != 96) {
WarnL << "rtp payload type未识别(" << (int) pt << "),已按ts或ps负载处理";
}
ref = std::make_shared<RtpReceiverImp>(90000,[this](RtpPacket::Ptr rtp) {
onRtpSorted(std::move(rtp));
});
//ts或ps负载
_rtp_decoder = std::make_shared<CommonRtpDecoder>(CodecInvalid, 32 * 1024);
_rtp_decoder[pt] = std::make_shared<CommonRtpDecoder>(CodecInvalid, 32 * 1024);
//设置dump目录
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
if (!dump_dir.empty()) {
@@ -65,13 +155,12 @@ void GB28181Process::onRtpSorted(RtpPacket::Ptr rtp, int) {
}
//设置frame回调
_rtp_decoder->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
_rtp_decoder[pt]->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame) {
onRtpDecode(frame);
}));
}
//解码rtp
_rtp_decoder->inputRtp(rtp, false);
return ref->inputRtp(TrackVideo, (unsigned char *) data, data_len);
}
const char *GB28181Process::onSearchPacketTail(const char *packet,size_t bytes){
@@ -96,8 +185,8 @@ const char *GB28181Process::onSearchPacketTail(const char *packet,size_t bytes){
}
void GB28181Process::onRtpDecode(const Frame::Ptr &frame) {
if (frame->getCodecId() == CodecH264) {
//这是H264
if (frame->getCodecId() != CodecInvalid) {
//这里不是ps或ts
_interface->inputFrame(frame);
return;
}

View File

@@ -21,7 +21,8 @@
namespace mediakit{
class GB28181Process : public HttpRequestSplitter, public RtpReceiver, public ProcessInterface{
class RtpReceiverImp;
class GB28181Process : public HttpRequestSplitter, public ProcessInterface{
public:
typedef std::shared_ptr<GB28181Process> Ptr;
GB28181Process(const MediaInfo &media_info, MediaSinkInterface *interface);
@@ -36,7 +37,7 @@ public:
bool inputRtp(bool, const char *data, size_t data_len) override;
protected:
void onRtpSorted(RtpPacket::Ptr rtp, int track_index) override ;
void onRtpSorted(RtpPacket::Ptr rtp);
const char *onSearchPacketTail(const char *data,size_t len) override;
ssize_t onRecvHeader(const char *data,size_t len) override { return 0; };
@@ -48,7 +49,8 @@ private:
DecoderImp::Ptr _decoder;
MediaSinkInterface *_interface;
std::shared_ptr<FILE> _save_file_ps;
std::shared_ptr<RtpCodec> _rtp_decoder;
unordered_map<uint8_t, std::shared_ptr<RtpCodec> > _rtp_decoder;
unordered_map<uint8_t, std::shared_ptr<RtpReceiverImp> > _rtp_receiver;
};
}//namespace mediakit

View File

@@ -13,14 +13,17 @@
#include "RtpProcess.h"
#include "Http/HttpTSPlayer.h"
#define RTP_APP_NAME "rtp"
static constexpr char kRtpAppName[] = "rtp";
//在创建_muxer对象前(也就是推流鉴权成功前)需要先缓存frame这样可以防止丢包提高体验
//但是同时需要控制缓冲长度防止内存溢出。200帧数据大概有10秒数据应该足矣等待鉴权hook返回
static constexpr size_t kMaxCachedFrame = 200;
namespace mediakit {
RtpProcess::RtpProcess(const string &stream_id) {
_media_info._schema = RTP_APP_NAME;
_media_info._schema = kRtpAppName;
_media_info._vhost = DEFAULT_VHOST;
_media_info._app = RTP_APP_NAME;
_media_info._app = kRtpAppName;
_media_info._streamid = stream_id;
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
@@ -78,11 +81,6 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
emitOnPublish();
}
if (!_muxer) {
//无权限推流
return false;
}
_total_bytes += len;
if (_save_file_rtp) {
uint16_t size = (uint16_t)len;
@@ -95,7 +93,7 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
}
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
if (!_muxer->isEnabled() && !dts_out && dump_dir.empty()) {
if (_muxer && !_muxer->isEnabled() && !dts_out && dump_dir.empty()) {
//无人访问、且不取时间戳、不导出调试文件时,我们可以直接丢弃数据
_last_frame_time.resetTime();
return false;
@@ -109,20 +107,55 @@ bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data
}
void RtpProcess::inputFrame(const Frame::Ptr &frame) {
_last_frame_time.resetTime();
_dts = frame->dts();
if (_save_file_video && frame->getTrackType() == TrackVideo) {
fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
}
_muxer->inputFrame(frame);
if (_muxer) {
_last_frame_time.resetTime();
_muxer->inputFrame(frame);
} else {
if (_cached_func.size() > kMaxCachedFrame) {
WarnL << "cached frame of track(" << frame->getCodecName() << ") is too much, now dropped";
return;
}
auto frame_cached = Frame::getCacheAbleFrame(frame);
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, frame_cached]() {
_last_frame_time.resetTime();
_muxer->inputFrame(frame_cached);
});
}
}
void RtpProcess::addTrack(const Track::Ptr &track) {
_muxer->addTrack(track);
if (_muxer) {
_muxer->addTrack(track);
} else {
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this, track]() {
_muxer->addTrack(track);
});
}
}
void RtpProcess::addTrackCompleted() {
_muxer->addTrackCompleted();
if (_muxer) {
_muxer->addTrackCompleted();
} else {
lock_guard<recursive_mutex> lck(_func_mtx);
_cached_func.emplace_back([this]() {
_muxer->addTrackCompleted();
});
}
}
void RtpProcess::doCachedFunc() {
lock_guard<recursive_mutex> lck(_func_mtx);
for (auto &func : _cached_func) {
func();
}
_cached_func.clear();
}
bool RtpProcess::alive() {
@@ -197,19 +230,20 @@ void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener) {
void RtpProcess::emitOnPublish() {
weak_ptr<RtpProcess> weak_self = shared_from_this();
Broadcast::PublishAuthInvoker invoker = [weak_self](const string &err, bool enableHls, bool enableMP4) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
auto strong_self = weak_self.lock();
if (!strong_self) {
return;
}
if (err.empty()) {
strongSelf->_muxer = std::make_shared<MultiMediaSourceMuxer>(strongSelf->_media_info._vhost,
strongSelf->_media_info._app,
strongSelf->_media_info._streamid, 0.0f,
strong_self->_muxer = std::make_shared<MultiMediaSourceMuxer>(strong_self->_media_info._vhost,
strong_self->_media_info._app,
strong_self->_media_info._streamid, 0.0f,
true, true, enableHls, enableMP4);
strongSelf->_muxer->setMediaListener(strongSelf);
InfoP(strongSelf) << "允许RTP推流";
strong_self->_muxer->setMediaListener(strong_self);
strong_self->doCachedFunc();
InfoP(strong_self) << "允许RTP推流";
} else {
WarnP(strongSelf) << "禁止RTP推流:" << err;
WarnP(strong_self) << "禁止RTP推流:" << err;
}
};

View File

@@ -79,6 +79,7 @@ protected:
private:
void emitOnPublish();
void doCachedFunc();
private:
uint32_t _dts = 0;
@@ -95,6 +96,8 @@ private:
atomic_bool _stop_rtp_check{false};
atomic_flag _busy_flag{false};
Ticker _last_check_alive;
recursive_mutex _func_mtx;
deque<function<void()> > _cached_func;
};
}//namespace mediakit

View File

@@ -95,6 +95,7 @@ void RtspPusher::publish(const string &url_str) {
}
void RtspPusher::onPublishResult(const SockException &ex, bool handshake_done) {
DebugL << ex.what();
if (ex.getErrCode() == Err_shutdown) {
//主动shutdown的不触发回调
return;