重写rtcp框架

This commit is contained in:
xiongziliang
2021-01-31 19:18:17 +08:00
parent 629c39685b
commit 5c6560f55d
16 changed files with 1617 additions and 337 deletions

View File

@@ -13,15 +13,9 @@
#include "Common/config.h"
#include "UDPServer.h"
#include "RtspSession.h"
#include "Util/mini.h"
#include "Util/MD5.h"
#include "Util/base64.h"
#include "Util/onceToken.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
#include "Network/sockutil.h"
#define RTSP_SERVER_SEND_RTCP 0
#include "Rtcp/Rtcp.h"
using namespace std;
using namespace toolkit;
@@ -171,21 +165,25 @@ void RtspSession::onWholeRtspPacket(Parser &parser) {
}
void RtspSession::onRtpPacket(const char *data, size_t len) {
if(!_push_src){
return;
}
uint8_t interleaved = data[1];
if(interleaved %2 == 0){
if (interleaved % 2 == 0) {
if (!_push_src) {
return;
}
auto track_idx = getTrackIndexByInterleaved(interleaved);
handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (unsigned char *) data + 4, len - 4);
}else{
handleOneRtp(track_idx, _sdp_track[track_idx]->_type, _sdp_track[track_idx]->_samplerate, (uint8_t *) data + 4, len - 4);
} else {
auto track_idx = getTrackIndexByInterleaved(interleaved - 1);
onRtcpPacket(track_idx, _sdp_track[track_idx], data + 4, len - 4);
}
}
void RtspSession::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, const char *data, size_t len){}
void RtspSession::onRtcpPacket(int track_idx, SdpTrack::Ptr &track, const char *data, size_t len){
auto rtcp_arr = RtcpHeader::loadFromBytes((char *) data, len);
for (auto &rtcp : rtcp_arr) {
_rtcp_context[track_idx]->onRtcp(rtcp);
}
}
ssize_t RtspSession::getContentLength(Parser &parser) {
if(parser.Method() == "POST"){
@@ -224,18 +222,26 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
if(_media_info._app.empty() || _media_info._streamid.empty()){
//推流rtsp url必须最少两级(rtsp://host/app/stream_id)不允许莫名其妙的推流url
sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, "rtsp推流url非法,最少确保两级rtsp url");
throw SockException(Err_shutdown,StrPrinter << "rtsp推流url非法:" << full_url);
static constexpr auto err = "rtsp推流url非法,最少确保两级rtsp url";
sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, err);
throw SockException(Err_shutdown, StrPrinter << err << ":" << full_url);
}
SdpParser sdpParser(parser.Content());
_sessionid = makeRandStr(12);
_sdp_track = sdpParser.getAvailableTrack();
if (_sdp_track.empty()) {
//sdp无效
static constexpr auto err = "sdp中无有效track";
sendRtspResponse("403 Forbidden", {"Content-Type", "text/plain"}, err);
throw SockException(Err_shutdown,StrPrinter << err << ":" << full_url);
}
for (auto &track : _sdp_track) {
_rtcp_context.emplace_back(std::make_shared<RtcpContext>(track->_samplerate));
}
_push_src = std::make_shared<RtspMediaSourceImp>(_media_info._vhost, _media_info._app, _media_info._streamid);
_push_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
_push_src->setSdp(sdpParser.toString());
sendRtspResponse("200 OK",{"Content-Base", _content_base + "/"});
}
@@ -395,11 +401,14 @@ void RtspSession::onAuthSuccess() {
strongSelf->_sdp_track = SdpParser(rtsp_src->getSdp()).getAvailableTrack();
if (strongSelf->_sdp_track.empty()) {
//该流无效
DebugL << "trackInfo,该流无效";
WarnL << "sdp中无有效track该流无效:" << rtsp_src->getSdp();
strongSelf->send_StreamNotFound();
strongSelf->shutdown(SockException(Err_shutdown,"can not find any available track in sdp"));
return;
}
for (auto &track : strongSelf->_sdp_track) {
strongSelf->_rtcp_context.emplace_back(std::make_shared<RtcpContext>(track->_samplerate));
}
strongSelf->_sessionid = makeRandStr(12);
strongSelf->_play_src = rtsp_src;
for(auto &track : strongSelf->_sdp_track){
@@ -590,15 +599,15 @@ void RtspSession::onAuthUser(const string &realm,const string &authorization){
}
}
inline void RtspSession::send_StreamNotFound() {
void RtspSession::send_StreamNotFound() {
sendRtspResponse("404 Stream Not Found",{"Connection","Close"});
}
inline void RtspSession::send_UnsupportedTransport() {
void RtspSession::send_UnsupportedTransport() {
sendRtspResponse("461 Unsupported Transport",{"Connection","Close"});
}
inline void RtspSession::send_SessionNotFound() {
void RtspSession::send_SessionNotFound() {
sendRtspResponse("454 Session Not Found",{"Connection","Close"});
}
@@ -899,7 +908,7 @@ void RtspSession::handleReq_SET_PARAMETER(const Parser &parser) {
sendRtspResponse("200 OK");
}
inline void RtspSession::send_NotAcceptable() {
void RtspSession::send_NotAcceptable() {
sendRtspResponse("406 Not Acceptable",{"Connection","Close"});
}
@@ -913,7 +922,7 @@ void RtspSession::onRtpSorted(const RtpPacket::Ptr &rtp, int track_idx) {
_push_src->onWrite(rtp, false);
}
inline void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr) {
void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &buf, const struct sockaddr &addr) {
//这是rtcp心跳包说明播放器还存活
_alive_ticker.resetTime();
@@ -921,7 +930,7 @@ inline void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &bu
if (_push_src) {
//这是rtsp推流上来的rtp包
auto &ref = _sdp_track[interleaved / 2];
handleOneRtp(interleaved / 2, ref->_type, ref->_samplerate, (unsigned char *) buf->data(), buf->size());
handleOneRtp(interleaved / 2, ref->_type, ref->_samplerate, (uint8_t *) buf->data(), buf->size());
} else if (!_udp_connected_flags.count(interleaved)) {
//这是rtsp播放器的rtp打洞包
_udp_connected_flags.emplace(interleaved);
@@ -937,7 +946,7 @@ inline void RtspSession::onRcvPeerUdpData(int interleaved, const Buffer::Ptr &bu
}
}
inline void RtspSession::startListenPeerUdpData(int track_idx) {
void RtspSession::startListenPeerUdpData(int track_idx) {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
auto srcIP = inet_addr(get_peer_ip().data());
auto onUdpData = [weakSelf,srcIP](const Buffer::Ptr &buf, struct sockaddr *peer_addr, int interleaved){
@@ -1052,7 +1061,7 @@ bool RtspSession::sendRtspResponse(const string &res_code, const std::initialize
return sendRtspResponse(res_code,header_map,sdp,protocol);
}
inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
int RtspSession::getTrackIndexByTrackType(TrackType type) {
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (type == _sdp_track[i]->_type) {
return i;
@@ -1064,7 +1073,7 @@ inline int RtspSession::getTrackIndexByTrackType(TrackType type) {
throw SockException(Err_shutdown, StrPrinter << "no such track with type:" << (int) type);
}
inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix) {
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (controlSuffix == _sdp_track[i]->_control_surffix) {
return i;
@@ -1076,7 +1085,7 @@ inline int RtspSession::getTrackIndexByControlSuffix(const string &controlSuffix
throw SockException(Err_shutdown, StrPrinter << "no such track with suffix:" << controlSuffix);
}
inline int RtspSession::getTrackIndexByInterleaved(int interleaved){
int RtspSession::getTrackIndexByInterleaved(int interleaved){
for (unsigned int i = 0; i < _sdp_track.size(); i++) {
if (_sdp_track[i]->_interleaved == interleaved) {
return i;
@@ -1114,21 +1123,37 @@ std::shared_ptr<SockInfo> RtspSession::getOriginSock(MediaSource &sender) const
return const_cast<RtspSession *>(this)->shared_from_this();
}
inline void RtspSession::onSendRtpPacket(const RtpPacket::Ptr &pkt){
#if RTSP_SERVER_SEND_RTCP
int track_index = getTrackIndexByTrackType(pkt->type);
RtcpCounter &counter = _rtcp_counter[track_index];
counter.pktCnt += 1;
counter.octCount += (pkt->size() - pkt->offset);
void RtspSession::onBeforeRtpSorted(const RtpPacket::Ptr &rtp, int track_index){
updateRtcpContext(rtp);
}
void RtspSession::updateRtcpContext(const RtpPacket::Ptr &rtp){
int track_index = getTrackIndexByTrackType(rtp->type);
auto &rtcp_ctx = _rtcp_context[track_index];
rtcp_ctx->onRtp(rtp->sequence, rtp->timeStamp, rtp->size() - 4);
auto &ticker = _rtcp_send_tickers[track_index];
//send rtcp every 5 second
if (ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
ticker.resetTime();
//直接保存网络字节序
memcpy(&counter.time_stamp, pkt->data() + 8, 4);
sendSenderReport(_rtp_type == Rtsp::RTP_TCP, track_index);
static auto send_rtcp = [](RtspSession *thiz, int index, Buffer::Ptr ptr) {
if (thiz->_rtp_type == Rtsp::RTP_TCP) {
auto &track = thiz->_sdp_track[index];
thiz->send(makeRtpOverTcpPrefix((uint16_t)(ptr->size()), track->_interleaved + 1));
thiz->send(std::move(ptr));
} else {
thiz->_rtcp_socks[index]->send(std::move(ptr));
}
};
auto rtcp = _push_src ? rtcp_ctx->createRtcpRR(rtp->ssrc + 1, rtp->ssrc) : rtcp_ctx->createRtcpSR(rtp->ssrc + 1);
auto rtcp_sdes = RtcpSdes::create({SERVER_NAME});
rtcp_sdes->items.type = (uint8_t)SdesType::RTCP_SDES_CNAME;
rtcp_sdes->items.ssrc = htonl(rtp->ssrc);
send_rtcp(this, track_index, std::move(rtcp));
send_rtcp(this, track_index, RtcpHeader::toBuffer(rtcp_sdes));
}
#endif
}
void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
@@ -1138,7 +1163,7 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
auto size = pkt->size();
setSendFlushFlag(false);
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
onSendRtpPacket(rtp);
updateRtcpContext(rtp);
if (++i == size) {
setSendFlushFlag(true);
}
@@ -1150,7 +1175,7 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
size_t i = 0;
auto size = pkt->size();
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
onSendRtpPacket(rtp);
updateRtcpContext(rtp);
int track_index = getTrackIndexByTrackType(rtp->type);
auto &pSock = _rtp_socks[track_index];
if (!pSock) {
@@ -1168,66 +1193,6 @@ void RtspSession::sendRtpPacket(const RtspMediaSource::RingDataType &pkt) {
}
}
void RtspSession::sendSenderReport(bool over_tcp, int track_index) {
static const char s_cname[] = "ZLMediaKitRtsp";
uint8_t rtcp_buf[4 + 28 + 10 + sizeof(s_cname) + 1] = {0};
uint8_t *rtcp_sr = rtcp_buf + 4, *rtcp_sdes = rtcp_sr + 28;
auto &track = _sdp_track[track_index];
auto &counter = _rtcp_counter[track_index];
rtcp_buf[0] = '$';
rtcp_buf[1] = track->_interleaved + 1;
rtcp_buf[2] = (sizeof(rtcp_buf) - 4) >> 8;
rtcp_buf[3] = (sizeof(rtcp_buf) - 4) & 0xFF;
rtcp_sr[0] = 0x80;
rtcp_sr[1] = 0xC8;
rtcp_sr[2] = 0x00;
rtcp_sr[3] = 0x06;
uint32_t ssrc = htonl(track->_ssrc);
memcpy(&rtcp_sr[4], &ssrc, 4);
uint32_t msw;
uint32_t lsw;
struct timeval tv;
gettimeofday(&tv, NULL);
msw = tv.tv_sec + 0x83AA7E80; /* 0x83AA7E80 is the number of seconds from 1900 to 1970 */
lsw = (uint32_t) ((double) tv.tv_usec * (double) (((uint64_t) 1) << 32) * 1.0e-6);
msw = htonl(msw);
memcpy(&rtcp_sr[8], &msw, 4);
lsw = htonl(lsw);
memcpy(&rtcp_sr[12], &lsw, 4);
//直接使用网络字节序
memcpy(&rtcp_sr[16], &counter.timeStamp, 4);
uint32_t pktCnt = htonl(counter.pktCnt);
memcpy(&rtcp_sr[20], &pktCnt, 4);
uint32_t octCount = htonl(counter.octCount);
memcpy(&rtcp_sr[24], &octCount, 4);
rtcp_sdes[0] = 0x81;
rtcp_sdes[1] = 0xCA;
rtcp_sdes[2] = 0x00;
rtcp_sdes[3] = 0x06;
memcpy(&rtcp_sdes[4], &ssrc, 4);
rtcp_sdes[8] = 0x01;
rtcp_sdes[9] = 0x0f;
memcpy(&rtcp_sdes[10], s_cname, sizeof(s_cname));
rtcp_sdes[10 + sizeof(s_cname)] = 0x00;
if (over_tcp) {
send(obtainBuffer((char *) rtcp_buf, sizeof(rtcp_buf)));
} else {
_rtcp_socks[track_index]->send((char *) rtcp_buf + 4, sizeof(rtcp_buf) - 4);
}
}
void RtspSession::setSocketFlags(){
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
if(mergeWriteMS > 0) {