mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2026-06-13 11:41:44 +08:00
优化代码结构
This commit is contained in:
@@ -9,14 +9,17 @@
|
||||
*/
|
||||
|
||||
#if defined(ENABLE_RTPPROXY)
|
||||
#include "GB28181Process.h"
|
||||
#include "RtpProcess.h"
|
||||
#include "RtpSplitter.h"
|
||||
#include "Util/File.h"
|
||||
#include "Http/HttpTSPlayer.h"
|
||||
|
||||
#define RTP_APP_NAME "rtp"
|
||||
|
||||
namespace mediakit{
|
||||
namespace mediakit {
|
||||
|
||||
static string printAddress(const struct sockaddr *addr){
|
||||
static string printAddress(const struct sockaddr *addr) {
|
||||
return StrPrinter << SockUtil::inet_ntoa(((struct sockaddr_in *) addr)->sin_addr) << ":" << ntohs(((struct sockaddr_in *) addr)->sin_port);
|
||||
}
|
||||
|
||||
@@ -26,20 +29,11 @@ RtpProcess::RtpProcess(const string &stream_id) {
|
||||
_media_info._app = RTP_APP_NAME;
|
||||
_media_info._streamid = stream_id;
|
||||
|
||||
GET_CONFIG(string,dump_dir,RtpProxy::kDumpDir);
|
||||
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
|
||||
{
|
||||
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".rtp", dump_dir).data(), "wb") : nullptr;
|
||||
if(fp){
|
||||
_save_file_rtp.reset(fp,[](FILE *fp){
|
||||
fclose(fp);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".mp2", dump_dir).data(), "wb") : nullptr;
|
||||
if(fp){
|
||||
_save_file_ps.reset(fp,[](FILE *fp){
|
||||
if (fp) {
|
||||
_save_file_rtp.reset(fp, [](FILE *fp) {
|
||||
fclose(fp);
|
||||
});
|
||||
}
|
||||
@@ -47,20 +41,16 @@ RtpProcess::RtpProcess(const string &stream_id) {
|
||||
|
||||
{
|
||||
FILE *fp = !dump_dir.empty() ? File::create_file(File::absolutePath(_media_info._streamid + ".video", dump_dir).data(), "wb") : nullptr;
|
||||
if(fp){
|
||||
_save_file_video.reset(fp,[](FILE *fp){
|
||||
if (fp) {
|
||||
_save_file_video.reset(fp, [](FILE *fp) {
|
||||
fclose(fp);
|
||||
});
|
||||
}
|
||||
}
|
||||
_rtp_decoder = std::make_shared<CommonRtpDecoder>(CodecInvalid, 256 * 1024);
|
||||
_rtp_decoder->addDelegate(std::make_shared<FrameWriterInterfaceHelper>([this](const Frame::Ptr &frame){
|
||||
onRtpDecode((uint8_t *) frame->data(), frame->size(), frame->dts());
|
||||
}));
|
||||
}
|
||||
|
||||
RtpProcess::~RtpProcess() {
|
||||
uint64_t duration = (_last_rtp_time.createdTime() - _last_rtp_time.elapsedTime()) / 1000;
|
||||
uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000;
|
||||
WarnP(this) << "RTP推流器("
|
||||
<< _media_info._vhost << "/"
|
||||
<< _media_info._app << "/"
|
||||
@@ -79,95 +69,47 @@ RtpProcess::~RtpProcess() {
|
||||
}
|
||||
}
|
||||
|
||||
bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_len,const struct sockaddr *addr,uint32_t *dts_out) {
|
||||
GET_CONFIG(bool,check_source,RtpProxy::kCheckSource);
|
||||
bool RtpProcess::inputRtp(bool is_udp, const Socket::Ptr &sock, const char *data, int len, const struct sockaddr *addr, uint32_t *dts_out) {
|
||||
GET_CONFIG(bool, check_source, RtpProxy::kCheckSource);
|
||||
//检查源是否合法
|
||||
if(!_addr){
|
||||
if (!_addr) {
|
||||
_addr = new struct sockaddr;
|
||||
_sock = sock;
|
||||
memcpy(_addr,addr, sizeof(struct sockaddr));
|
||||
memcpy(_addr, addr, sizeof(struct sockaddr));
|
||||
DebugP(this) << "bind to address:" << printAddress(_addr);
|
||||
//推流鉴权
|
||||
emitOnPublish();
|
||||
}
|
||||
|
||||
if(!_muxer){
|
||||
if (!_muxer) {
|
||||
//无权限推流
|
||||
return false;
|
||||
}
|
||||
|
||||
if(check_source && memcmp(_addr,addr,sizeof(struct sockaddr)) != 0){
|
||||
if (check_source && memcmp(_addr, addr, sizeof(struct sockaddr)) != 0) {
|
||||
DebugP(this) << "address dismatch:" << printAddress(addr) << " != " << printAddress(_addr);
|
||||
return false;
|
||||
}
|
||||
|
||||
_total_bytes += data_len;
|
||||
bool ret = handleOneRtp(0, TrackVideo, 90000, (unsigned char *) data, data_len);
|
||||
if(dts_out){
|
||||
_total_bytes += len;
|
||||
if (_save_file_rtp) {
|
||||
uint16_t size = len;
|
||||
size = htons(size);
|
||||
fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
|
||||
fwrite((uint8_t *) data, len, 1, _save_file_rtp.get());
|
||||
}
|
||||
if (!_process) {
|
||||
_process = std::make_shared<GB28181Process>(_media_info, this);
|
||||
}
|
||||
bool ret = _process ? _process->inputRtp(is_udp, data, len) : false;
|
||||
if (dts_out) {
|
||||
*dts_out = _dts;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
//判断是否为ts负载
|
||||
static inline bool checkTS(const uint8_t *packet, int bytes){
|
||||
return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE;
|
||||
}
|
||||
|
||||
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
|
||||
if(rtp->sequence != (uint16_t)(_sequence + 1) && _sequence != 0){
|
||||
WarnP(this) << "rtp丢包:" << rtp->sequence << " != " << _sequence << "+1" << ",公网环境下请使用tcp方式推流";
|
||||
}
|
||||
_sequence = rtp->sequence;
|
||||
if(_save_file_rtp){
|
||||
uint16_t size = rtp->size() - 4;
|
||||
size = htons(size);
|
||||
fwrite((uint8_t *) &size, 2, 1, _save_file_rtp.get());
|
||||
fwrite((uint8_t *) rtp->data() + 4, rtp->size() - 4, 1, _save_file_rtp.get());
|
||||
}
|
||||
_rtp_decoder->inputRtp(rtp);
|
||||
}
|
||||
|
||||
const char *RtpProcess::onSearchPacketTail(const char *packet,int bytes){
|
||||
try {
|
||||
auto ret = _decoder->input((uint8_t *) packet, bytes);
|
||||
if (ret > 0) {
|
||||
return packet + ret;
|
||||
}
|
||||
return nullptr;
|
||||
} catch (std::exception &ex) {
|
||||
InfoL << "解析ps或ts异常: bytes=" << bytes
|
||||
<< " ,exception=" << ex.what()
|
||||
<< " ,hex=" << hexdump((uint8_t *) packet, bytes);
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestamp) {
|
||||
if(_save_file_ps){
|
||||
fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get());
|
||||
}
|
||||
|
||||
if (!_decoder) {
|
||||
//创建解码器
|
||||
if (checkTS(packet, bytes)) {
|
||||
//猜测是ts负载
|
||||
InfoP(this) << "judged to be TS";
|
||||
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, this);
|
||||
} else {
|
||||
//猜测是ps负载
|
||||
InfoP(this) << "judged to be PS";
|
||||
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, this);
|
||||
}
|
||||
}
|
||||
|
||||
if (_decoder) {
|
||||
HttpRequestSplitter::input((char *) packet, bytes);
|
||||
}
|
||||
}
|
||||
|
||||
void RtpProcess::inputFrame(const Frame::Ptr &frame){
|
||||
_last_rtp_time.resetTime();
|
||||
void RtpProcess::inputFrame(const Frame::Ptr &frame) {
|
||||
_last_frame_time.resetTime();
|
||||
_dts = frame->dts();
|
||||
if (_save_file_video && frame->getTrackType() == TrackVideo) {
|
||||
fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
|
||||
@@ -175,20 +117,20 @@ void RtpProcess::inputFrame(const Frame::Ptr &frame){
|
||||
_muxer->inputFrame(frame);
|
||||
}
|
||||
|
||||
void RtpProcess::addTrack(const Track::Ptr & track){
|
||||
void RtpProcess::addTrack(const Track::Ptr &track) {
|
||||
_muxer->addTrack(track);
|
||||
}
|
||||
|
||||
bool RtpProcess::alive() {
|
||||
GET_CONFIG(int,timeoutSec,RtpProxy::kTimeoutSec)
|
||||
if(_last_rtp_time.elapsedTime() / 1000 < timeoutSec){
|
||||
GET_CONFIG(int, timeoutSec, RtpProxy::kTimeoutSec)
|
||||
if (_last_frame_time.elapsedTime() / 1000 < timeoutSec) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void RtpProcess::onDetach(){
|
||||
if(_on_detach){
|
||||
void RtpProcess::onDetach() {
|
||||
if (_on_detach) {
|
||||
_on_detach();
|
||||
}
|
||||
}
|
||||
@@ -198,45 +140,45 @@ void RtpProcess::setOnDetach(const function<void()> &cb) {
|
||||
}
|
||||
|
||||
string RtpProcess::get_peer_ip() {
|
||||
if(_addr){
|
||||
if (_addr) {
|
||||
return SockUtil::inet_ntoa(((struct sockaddr_in *) _addr)->sin_addr);
|
||||
}
|
||||
return "0.0.0.0";
|
||||
}
|
||||
|
||||
uint16_t RtpProcess::get_peer_port() {
|
||||
if(!_addr){
|
||||
if (!_addr) {
|
||||
return 0;
|
||||
}
|
||||
return ntohs(((struct sockaddr_in *) _addr)->sin_port);
|
||||
}
|
||||
|
||||
string RtpProcess::get_local_ip() {
|
||||
if(_sock){
|
||||
if (_sock) {
|
||||
return _sock->get_local_ip();
|
||||
}
|
||||
return "0.0.0.0";
|
||||
}
|
||||
|
||||
uint16_t RtpProcess::get_local_port() {
|
||||
if(_sock){
|
||||
return _sock->get_local_port();
|
||||
if (_sock) {
|
||||
return _sock->get_local_port();
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
string RtpProcess::getIdentifier() const{
|
||||
string RtpProcess::getIdentifier() const {
|
||||
return _media_info._streamid;
|
||||
}
|
||||
|
||||
int RtpProcess::totalReaderCount(){
|
||||
int RtpProcess::totalReaderCount() {
|
||||
return _muxer ? _muxer->totalReaderCount() : 0;
|
||||
}
|
||||
|
||||
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener){
|
||||
if(_muxer){
|
||||
void RtpProcess::setListener(const std::weak_ptr<MediaSourceEvent> &listener) {
|
||||
if (_muxer) {
|
||||
_muxer->setMediaListener(listener);
|
||||
}else{
|
||||
} else {
|
||||
_listener = listener;
|
||||
}
|
||||
}
|
||||
@@ -262,7 +204,7 @@ void RtpProcess::emitOnPublish() {
|
||||
|
||||
//触发推流鉴权事件
|
||||
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPublish, _media_info, invoker, static_cast<SockInfo &>(*this));
|
||||
if(!flag){
|
||||
if (!flag) {
|
||||
//该事件无人监听,默认不鉴权
|
||||
GET_CONFIG(bool, toHls, General::kPublishToHls);
|
||||
GET_CONFIG(bool, toMP4, General::kPublishToMP4);
|
||||
|
||||
Reference in New Issue
Block a user