rtp server新增支持自定义vhost和app名称 (#3693)

代码来自https://github.com/ZLMediaKit/ZLMediaKit/pull/3446 , 增加了vhost
This commit is contained in:
zhang2349
2024-07-09 10:42:10 +08:00
committed by GitHub
parent b4fecdc929
commit c72e576420
14 changed files with 252 additions and 76 deletions

View File

@@ -107,7 +107,7 @@ extern const std::string kBroadcastReloadConfig;
// rtp server 超时
extern const std::string kBroadcastRtpServerTimeout;
#define BroadcastRtpServerTimeoutArgs uint16_t &local_port, const string &stream_id,int &tcp_mode, bool &re_use_port, uint32_t &ssrc
#define BroadcastRtpServerTimeoutArgs uint16_t &local_port, const MediaTuple &tuple, int &tcp_mode, bool &re_use_port, uint32_t &ssrc
// rtc transport sctp 连接状态
extern const std::string kBroadcastRtcSctpConnecting;

View File

@@ -23,17 +23,14 @@ static constexpr size_t kMaxCachedFrameMS = 10 * 1000;
namespace mediakit {
RtpProcess::Ptr RtpProcess::createProcess(std::string stream_id) {
RtpProcess::Ptr ret(new RtpProcess(std::move(stream_id)));
ret->createTimer();
return ret;
RtpProcess::Ptr RtpProcess::createProcess(const MediaTuple &tuple) {
RtpProcess::Ptr ret(new RtpProcess(tuple));
ret->createTimer();
return ret;
}
RtpProcess::RtpProcess(string stream_id) {
_media_info.schema = kRtpAppName;
_media_info.vhost = DEFAULT_VHOST;
_media_info.app = kRtpAppName;
_media_info.stream = std::move(stream_id);
RtpProcess::RtpProcess(const MediaTuple &tuple) {
static_cast<MediaTuple &>(_media_info) = tuple;
GET_CONFIG(string, dump_dir, RtpProxy::kDumpDir);
{

View File

@@ -25,7 +25,7 @@ public:
using Ptr = std::shared_ptr<RtpProcess>;
using onDetachCB = std::function<void(const toolkit::SockException &ex)>;
static Ptr createProcess(std::string stream_id);
static Ptr createProcess(const MediaTuple &tuple);
~RtpProcess();
enum OnlyTrack { kAll = 0, kOnlyAudio = 1, kOnlyVideo = 2 };
@@ -91,7 +91,7 @@ protected:
bool close(mediakit::MediaSource &sender) override;
private:
RtpProcess(std::string stream_id);
RtpProcess(const MediaTuple &tuple);
void emitOnPublish();
void doCachedFunc();

View File

@@ -30,18 +30,18 @@ class RtcpHelper: public std::enable_shared_from_this<RtcpHelper> {
public:
using Ptr = std::shared_ptr<RtcpHelper>;
RtcpHelper(Socket::Ptr rtcp_sock, std::string stream_id) {
RtcpHelper(Socket::Ptr rtcp_sock, MediaTuple tuple) {
_rtcp_sock = std::move(rtcp_sock);
_stream_id = std::move(stream_id);
_tuple = std::move(tuple);
}
void setRtpServerInfo(uint16_t local_port, RtpServer::TcpMode mode, bool re_use_port, uint32_t ssrc, int only_track) {
_ssrc = ssrc;
_process = RtpProcess::createProcess(_stream_id);
_process = RtpProcess::createProcess(_tuple);
_process->setOnlyTrack((RtpProcess::OnlyTrack)only_track);
_timeout_cb = [=]() mutable {
NOTICE_EMIT(BroadcastRtpServerTimeoutArgs, Broadcast::kBroadcastRtpServerTimeout, local_port, _stream_id, (int)mode, re_use_port, ssrc);
NOTICE_EMIT(BroadcastRtpServerTimeoutArgs, Broadcast::kBroadcastRtpServerTimeout, local_port, _tuple, (int)mode, re_use_port, ssrc);
};
weak_ptr<RtcpHelper> weak_self = shared_from_this();
@@ -117,12 +117,12 @@ private:
Ticker _ticker;
Socket::Ptr _rtcp_sock;
RtpProcess::Ptr _process;
std::string _stream_id;
MediaTuple _tuple;
RtpProcess::onDetachCB _on_detach;
std::shared_ptr<struct sockaddr_storage> _rtcp_addr;
};
void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
void RtpServer::start(uint16_t local_port, const MediaTuple &tuple, TcpMode tcp_mode, const char *local_ip, bool re_use_port, uint32_t ssrc, int only_track, bool multiplex) {
//创建udp服务器
auto poller = EventPollerPool::Instance().getPoller();
Socket::Ptr rtp_socket = Socket::createSocket(poller, true);
@@ -148,9 +148,9 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
UdpServer::Ptr udp_server;
RtcpHelper::Ptr helper;
//增加了多路复用判断如果多路复用为true就走else逻辑同时保留了原来stream_id为空走else逻辑
if (!stream_id.empty() && !multiplex) {
if (!tuple.stream.empty() && !multiplex) {
//指定了流id那么一个端口一个流(不管是否包含多个ssrc的多个流绑定rtp源后会筛选掉ip端口不匹配的流)
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), stream_id);
helper = std::make_shared<RtcpHelper>(std::move(rtcp_socket), tuple);
helper->startRtcp();
helper->setRtpServerInfo(local_port, tcp_mode, re_use_port, ssrc, only_track);
bool bind_peer_addr = false;
@@ -185,7 +185,9 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
auto processor = helper ? helper->getProcess() : nullptr;
// 如果共享同一个processor对象那么tcp server深圳为单线程模式确保线程安全
tcp_server = std::make_shared<TcpServer>(processor ? poller : nullptr);
(*tcp_server)[RtpSession::kStreamID] = stream_id;
(*tcp_server)[RtpSession::kVhost] = tuple.vhost;
(*tcp_server)[RtpSession::kApp] = tuple.app;
(*tcp_server)[RtpSession::kStreamID] = tuple.stream;
(*tcp_server)[RtpSession::kSSRC] = ssrc;
(*tcp_server)[RtpSession::kOnlyTrack] = only_track;
if (tcp_mode == PASSIVE) {
@@ -193,13 +195,13 @@ void RtpServer::start(uint16_t local_port, const string &stream_id, TcpMode tcp_
tcp_server->start<RtpSession>(local_port, local_ip, 1024, [weak_self, processor](std::shared_ptr<RtpSession> &session) {
session->setRtpProcess(processor);
});
} else if (stream_id.empty()) {
} else if (tuple.stream.empty()) {
// tcp主动模式时只能一个端口一个流必须指定流id; 创建TcpServer对象也仅用于传参
throw std::runtime_error(StrPrinter << "tcp主动模式时必需指定流id");
}
}
_on_cleanup = [rtp_socket, stream_id]() {
_on_cleanup = [rtp_socket]() {
if (rtp_socket) {
//去除循环引用
rtp_socket->setOnRead(nullptr);

View File

@@ -43,7 +43,7 @@ public:
* @param ssrc 指定的ssrc
* @param multiplex 多路复用
*/
void start(uint16_t local_port, const std::string &stream_id = "", TcpMode tcp_mode = PASSIVE,
void start(uint16_t local_port, const MediaTuple &tuple = MediaTuple{DEFAULT_VHOST, kRtpAppName, "", ""}, TcpMode tcp_mode = PASSIVE,
const char *local_ip = "::", bool re_use_port = true, uint32_t ssrc = 0, int only_track = 0, bool multiplex = false);
/**

View File

@@ -21,6 +21,8 @@ using namespace toolkit;
namespace mediakit{
const string RtpSession::kVhost = "vhost";
const string RtpSession::kApp = "app";
const string RtpSession::kStreamID = "stream_id";
const string RtpSession::kSSRC = "ssrc";
const string RtpSession::kOnlyTrack = "only_track";
@@ -31,7 +33,9 @@ void RtpSession::attachServer(const Server &server) {
}
void RtpSession::setParams(mINI &ini) {
_stream_id = ini[kStreamID];
_tuple.vhost = ini[kVhost];
_tuple.app = ini[kApp];
_tuple.stream = ini[kStreamID];
_ssrc = ini[kSSRC];
_only_track = ini[kOnlyTrack];
int udp_socket_buffer = ini[kUdpRecvBuffer];
@@ -63,7 +67,7 @@ void RtpSession::onError(const SockException &err) {
if (_emit_detach) {
_process->onDetach(err);
}
WarnP(this) << _stream_id << " " << err;
WarnP(this) << _tuple.shortUrl() << " " << err;
}
void RtpSession::onManager() {
@@ -107,12 +111,12 @@ void RtpSession::onRtpPacket(const char *data, size_t len) {
}
// 未指定流id就使用ssrc为流id
if (_stream_id.empty()) {
_stream_id = printSSRC(_ssrc);
if (_tuple.stream.empty()) {
_tuple.stream = printSSRC(_ssrc);
}
if (!_process) {
_process = RtpProcess::createProcess(_stream_id);
_process = RtpProcess::createProcess(_tuple);
_process->setOnlyTrack((RtpProcess::OnlyTrack)_only_track);
weak_ptr<RtpSession> weak_self = static_pointer_cast<RtpSession>(shared_from_this());
_process->setOnDetach([weak_self](const SockException &ex) {

View File

@@ -22,6 +22,8 @@ namespace mediakit{
class RtpSession : public toolkit::Session, public RtpSplitter {
public:
static const std::string kVhost;
static const std::string kApp;
static const std::string kStreamID;
static const std::string kSSRC;
static const std::string kOnlyTrack;
@@ -54,7 +56,7 @@ private:
int _only_track = 0;
uint32_t _ssrc = 0;
toolkit::Ticker _ticker;
std::string _stream_id;
MediaTuple _tuple;
struct sockaddr_storage _addr;
RtpProcess::Ptr _process;
};