进一步抽象ts/ps解析代码

This commit is contained in:
xiongziliang
2020-05-17 18:00:23 +08:00
parent cf599167c1
commit 198f223d63
6 changed files with 283 additions and 208 deletions

View File

@@ -9,44 +9,13 @@
*/
#if defined(ENABLE_RTPPROXY)
#include "mpeg-ts-proto.h"
#include "RtpProcess.h"
#include "Util/File.h"
#include "Extension/H265.h"
#include "Extension/AAC.h"
#include "Extension/G711.h"
#include "Http/HttpTSPlayer.h"
#define RTP_APP_NAME "rtp"
namespace mediakit{
/**
* 合并一些时间戳相同的frame
*/
class FrameMerger {
public:
FrameMerger() = default;
virtual ~FrameMerger() = default;
void inputFrame(const Frame::Ptr &frame,const function<void(uint32_t dts,uint32_t pts,const Buffer::Ptr &buffer)> &cb){
if (!_frameCached.empty() && _frameCached.back()->dts() != frame->dts()) {
Frame::Ptr back = _frameCached.back();
Buffer::Ptr merged_frame = back;
if(_frameCached.size() != 1){
string merged;
_frameCached.for_each([&](const Frame::Ptr &frame){
merged.append(frame->data(),frame->size());
});
merged_frame = std::make_shared<BufferString>(std::move(merged));
}
cb(back->dts(),back->pts(),merged_frame);
_frameCached.clear();
}
_frameCached.emplace_back(Frame::getCacheAbleFrame(frame));
}
private:
List<Frame::Ptr> _frameCached;
};
string printSSRC(uint32_t ui32Ssrc) {
char tmp[9] = { 0 };
ui32Ssrc = htonl(ui32Ssrc);
@@ -101,7 +70,6 @@ RtpProcess::RtpProcess(uint32_t ssrc) {
});
}
}
_merger = std::make_shared<FrameMerger>();
}
RtpProcess::~RtpProcess() {
@@ -157,7 +125,7 @@ bool RtpProcess::inputRtp(const Socket::Ptr &sock, const char *data, int data_le
//判断是否为ts负载
static inline bool checkTS(const uint8_t *packet, int bytes){
return bytes % 188 == 0 && packet[0] == 0x47;
return bytes % TS_PACKET_SIZE == 0 && packet[0] == TS_SYNC_BYTE;
}
void RtpProcess::onRtpSorted(const RtpPacket::Ptr &rtp, int) {
@@ -179,153 +147,37 @@ void RtpProcess::onRtpDecode(const uint8_t *packet, int bytes, uint32_t timestam
fwrite((uint8_t *)packet,bytes, 1, _save_file_ps.get());
}
if(!_decoder){
if (!_decoder) {
//创建解码器
if(checkTS(packet, bytes)){
if (checkTS(packet, bytes)) {
//猜测是ts负载
InfoP(this) << "judged to be TS";
_decoder = Decoder::createDecoder(Decoder::decoder_ts);
}else{
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ts, shared_from_this());
} else {
//猜测是ps负载
InfoP(this) << "judged to be PS";
_decoder = Decoder::createDecoder(Decoder::decoder_ps);
_decoder = DecoderImp::createDecoder(DecoderImp::decoder_ps, shared_from_this());
}
_decoder->setOnDecode([this](int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes){
onDecode(stream,codecid,flags,pts,dts,data,bytes);
});
}
auto ret = _decoder->input((uint8_t *)packet,bytes);
if(ret != bytes){
WarnP(this) << ret << " != " << bytes << " " << flags;
if (_decoder) {
auto ret = _decoder->input((uint8_t *) packet, bytes);
if (ret != bytes) {
WarnP(this) << ret << " != " << bytes << " " << flags;
}
}
}
#define SWITCH_CASE(codec_id) case codec_id : return #codec_id
static const char *getCodecName(int codec_id) {
switch (codec_id) {
SWITCH_CASE(PSI_STREAM_MPEG1);
SWITCH_CASE(PSI_STREAM_MPEG2);
SWITCH_CASE(PSI_STREAM_AUDIO_MPEG1);
SWITCH_CASE(PSI_STREAM_MP3);
SWITCH_CASE(PSI_STREAM_AAC);
SWITCH_CASE(PSI_STREAM_MPEG4);
SWITCH_CASE(PSI_STREAM_MPEG4_AAC_LATM);
SWITCH_CASE(PSI_STREAM_H264);
SWITCH_CASE(PSI_STREAM_MPEG4_AAC);
SWITCH_CASE(PSI_STREAM_H265);
SWITCH_CASE(PSI_STREAM_AUDIO_AC3);
SWITCH_CASE(PSI_STREAM_AUDIO_EAC3);
SWITCH_CASE(PSI_STREAM_AUDIO_DTS);
SWITCH_CASE(PSI_STREAM_VIDEO_DIRAC);
SWITCH_CASE(PSI_STREAM_VIDEO_VC1);
SWITCH_CASE(PSI_STREAM_VIDEO_SVAC);
SWITCH_CASE(PSI_STREAM_AUDIO_SVAC);
SWITCH_CASE(PSI_STREAM_AUDIO_G711A);
SWITCH_CASE(PSI_STREAM_AUDIO_G711U);
SWITCH_CASE(PSI_STREAM_AUDIO_G722);
SWITCH_CASE(PSI_STREAM_AUDIO_G723);
SWITCH_CASE(PSI_STREAM_AUDIO_G729);
default : return "unknown codec";
void RtpProcess::inputFrame(const Frame::Ptr &frame){
_dts = frame->dts();
if (_save_file_video && frame->getTrackType() == TrackVideo) {
fwrite((uint8_t *) frame->data(), frame->size(), 1, _save_file_video.get());
}
_muxer->inputFrame(frame);
}
void RtpProcess::onDecode(int stream,int codecid,int flags,int64_t pts,int64_t dts,const void *data,int bytes) {
pts /= 90;
dts /= 90;
_dts = dts;
switch (codecid) {
case PSI_STREAM_H264: {
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
InfoP(this) << "got video track: H264";
auto track = std::make_shared<H264Track>();
_muxer->addTrack(track);
}
if (codecid != _codecid_video) {
WarnP(this) << "video track change to H264 from codecid:" << getCodecName(_codecid_video);
return;
}
if(_save_file_video){
fwrite((uint8_t *)data,bytes, 1, _save_file_video.get());
}
auto frame = std::make_shared<H264FrameNoCacheAble>((char *) data, bytes, dts, pts,0);
_merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) {
_muxer->inputFrame(std::make_shared<H264FrameNoCacheAble>(buffer->data(), buffer->size(), dts, pts,4));
});
break;
}
case PSI_STREAM_H265: {
if (!_codecid_video) {
//获取到视频
_codecid_video = codecid;
InfoP(this) << "got video track: H265";
auto track = std::make_shared<H265Track>();
_muxer->addTrack(track);
}
if (codecid != _codecid_video) {
WarnP(this) << "video track change to H265 from codecid:" << getCodecName(_codecid_video);
return;
}
if(_save_file_video){
fwrite((uint8_t *)data,bytes, 1, _save_file_video.get());
}
auto frame = std::make_shared<H265FrameNoCacheAble>((char *) data, bytes, dts, pts, 0);
_merger->inputFrame(frame,[this](uint32_t dts, uint32_t pts, const Buffer::Ptr &buffer) {
_muxer->inputFrame(std::make_shared<H265FrameNoCacheAble>(buffer->data(), buffer->size(), dts, pts, 4));
});
break;
}
case PSI_STREAM_AAC: {
if (!_codecid_audio) {
//获取到音频
_codecid_audio = codecid;
InfoP(this) << "got audio track: AAC";
auto track = std::make_shared<AACTrack>();
_muxer->addTrack(track);
}
if (codecid != _codecid_audio) {
WarnP(this) << "audio track change to AAC from codecid:" << getCodecName(_codecid_audio);
return;
}
_muxer->inputFrame(std::make_shared<AACFrameNoCacheAble>((char *) data, bytes, dts, 0, 7));
break;
}
case PSI_STREAM_AUDIO_G711A:
case PSI_STREAM_AUDIO_G711U: {
auto codec = codecid == PSI_STREAM_AUDIO_G711A ? CodecG711A : CodecG711U;
if (!_codecid_audio) {
//获取到音频
_codecid_audio = codecid;
InfoP(this) << "got audio track: G711";
//G711传统只支持 8000/1/16的规格FFmpeg貌似做了扩展但是这里不管它了
auto track = std::make_shared<G711Track>(codec, 8000, 1, 16);
_muxer->addTrack(track);
}
if (codecid != _codecid_audio) {
WarnP(this) << "audio track change to G711 from codecid:" << getCodecName(_codecid_audio);
return;
}
auto frame = std::make_shared<G711FrameNoCacheAble>((char *) data, bytes, dts);
frame->setCodec(codec);
_muxer->inputFrame(frame);
break;
}
default:
if(codecid != 0){
WarnP(this) << "unsupported codec type:" << getCodecName(codecid) << " " << (int)codecid;
}
return;
}
void RtpProcess::addTrack(const Track::Ptr & track){
_muxer->addTrack(track);
}
bool RtpProcess::alive() {
@@ -410,6 +262,5 @@ void RtpProcess::emitOnPublish() {
}
}
}//namespace mediakit
#endif//defined(ENABLE_RTPPROXY)