GB28181单端口接收流支持多线程

This commit is contained in:
ziyue
2021-06-08 14:03:25 +08:00
parent 92736db5b2
commit c4817c6d5f
5 changed files with 85 additions and 51 deletions

View File

@@ -16,16 +16,27 @@
namespace mediakit{
const string RtpSession::kStreamID = "stream_id";
const string RtpSession::kIsUDP = "is_udp";
void RtpSession::attachServer(const Server &server) {
_stream_id = const_cast<Server &>(server)[kStreamID];
_is_udp = const_cast<Server &>(server)[kIsUDP];
if (_is_udp) {
//设置udp socket读缓存
SockUtil::setRecvBuf(getSock()->rawFD(), 4 * 1024 * 1024);
_statistic_udp = std::make_shared<ObjectStatistic<UdpSession> >();
} else {
_statistic_tcp = std::make_shared<ObjectStatistic<TcpSession> >();
}
}
RtpSession::RtpSession(const Socket::Ptr &sock) : TcpSession(sock) {
RtpSession::RtpSession(const Socket::Ptr &sock) : Session(sock) {
DebugP(this);
socklen_t addr_len = sizeof(addr);
getpeername(sock->rawFD(), &addr, &addr_len);
socklen_t addr_len = sizeof(_addr);
getpeername(sock->rawFD(), &_addr, &addr_len);
}
RtpSession::~RtpSession() {
DebugP(this);
if(_process){
@@ -35,6 +46,10 @@ RtpSession::~RtpSession() {
void RtpSession::onRecv(const Buffer::Ptr &data) {
try {
if (_is_udp) {
onRtpPacket(data->data(), data->size());
return;
}
RtpSplitter::input(data->data(), data->size());
} catch (SockException &ex) {
shutdown(ex);
@@ -58,19 +73,21 @@ void RtpSession::onManager() {
}
void RtpSession::onRtpPacket(const char *data, size_t len) {
if (_search_rtp) {
//搜索上下文期间,数据丢弃
if (_search_rtp_finished) {
//下个包开始就是正确的rtp包了
_search_rtp_finished = false;
_search_rtp = false;
if (!_is_udp) {
if (_search_rtp) {
//搜索上下文期间,数据丢弃
if (_search_rtp_finished) {
//下个包开始就是正确的rtp包了
_search_rtp_finished = false;
_search_rtp = false;
}
return;
}
if (len > 1024 * 10) {
_search_rtp = true;
WarnL << "rtp包长度异常(" << len << ")发送端可能缓存溢出并覆盖开始搜索ssrc以便恢复上下文";
return;
}
return;
}
if (len > 1024 * 10) {
_search_rtp = true;
WarnL << "rtp包长度异常(" << len << ")发送端可能缓存溢出并覆盖开始搜索ssrc以便恢复上下文";
return;
}
if (!_process) {
if (!RtpSelector::getSSRC(data, len, _ssrc)) {
@@ -85,10 +102,14 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
_process->setListener(dynamic_pointer_cast<RtpSession>(shared_from_this()));
}
try {
_process->inputRtp(false, getSock(), data, len, &addr);
_process->inputRtp(false, getSock(), data, len, &_addr);
} catch (RtpReceiver::BadRtpException &ex) {
WarnL << ex.what() << "开始搜索ssrc以便恢复上下文";
_search_rtp = true;
if (!_is_udp) {
WarnL << ex.what() << "开始搜索ssrc以便恢复上下文";
_search_rtp = true;
} else {
throw;
}
} catch (...) {
throw;
}