mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2026-07-01 23:57:37 +08:00
新增语音对讲接口(startSendRtpTalk)
This commit is contained in:
@@ -190,6 +190,25 @@ void RtpSender::startSend(const MediaSourceEvent::SendRtpArgs &args, const funct
|
||||
}
|
||||
}, delay_ms / 1000.0, "::", args.src_port);
|
||||
InfoL << "start tcp active send rtp to: " << args.dst_url << ":" << args.dst_port;
|
||||
} else if (args.con_type == MediaSourceEvent::SendRtpArgs::kVoiceTalk) {
|
||||
auto src = MediaSource::find(args.recv_stream_vhost, args.recv_stream_app, args.recv_stream_id);
|
||||
if (!src) {
|
||||
cb(0, SockException(Err_other, "can not find the target stream"));
|
||||
return;
|
||||
}
|
||||
auto processor = src->getRtpProcess();
|
||||
if (!processor) {
|
||||
cb(0, SockException(Err_other, "get rtp processor from target stream failed"));
|
||||
return;
|
||||
}
|
||||
auto sock = processor->getSock();
|
||||
if (!sock) {
|
||||
cb(0, SockException(Err_other, "get sock from rtp processor failed"));
|
||||
return;
|
||||
}
|
||||
_socket_rtp = std::move(sock);
|
||||
onConnect();
|
||||
cb(_socket_rtp->get_local_port(), SockException());
|
||||
} else {
|
||||
CHECK(0, "invalid con type");
|
||||
}
|
||||
@@ -249,48 +268,51 @@ void RtpSender::onConnect() {
|
||||
// 加大发送缓存,防止udp丢包之类的问题 [AUTO-TRANSLATED:6e1cb40a]
|
||||
// Increase the send buffer to prevent problems such as UDP packet loss
|
||||
SockUtil::setSendBuf(_socket_rtp->rawFD(), 4 * 1024 * 1024);
|
||||
if (_args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive || _args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) {
|
||||
// 关闭tcp no_delay并开启MSG_MORE, 提高发送性能 [AUTO-TRANSLATED:c0f4e378]
|
||||
// Close TCP no_delay and enable MSG_MORE to improve sending performance
|
||||
SockUtil::setNoDelay(_socket_rtp->rawFD(), false);
|
||||
_socket_rtp->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
|
||||
} else if (_args.udp_rtcp_timeout) {
|
||||
createRtcpSocket();
|
||||
}
|
||||
// 连接建立成功事件 [AUTO-TRANSLATED:ac279c86]
|
||||
// Connection established successfully event
|
||||
weak_ptr<RtpSender> weak_self = shared_from_this();
|
||||
if (!_args.recv_stream_id.empty()) {
|
||||
mINI ini;
|
||||
ini[RtpSession::kStreamID] = _args.recv_stream_id;
|
||||
// 强制同步接收流和发送流的app和vhost [AUTO-TRANSLATED:134c9663]
|
||||
// Force synchronization of the app and vhost of the receive stream and send stream
|
||||
ini[RtpSession::kApp] = _args.recv_stream_app;
|
||||
ini[RtpSession::kVhost] = _args.recv_stream_vhost;
|
||||
_rtp_session = std::make_shared<RtpSession>(_socket_rtp);
|
||||
_rtp_session->setParams(ini);
|
||||
|
||||
_socket_rtp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
|
||||
if (_args.con_type != MediaSourceEvent::SendRtpArgs::kVoiceTalk) {
|
||||
if (_args.con_type == MediaSourceEvent::SendRtpArgs::kTcpActive || _args.con_type == MediaSourceEvent::SendRtpArgs::kTcpPassive) {
|
||||
// 关闭tcp no_delay并开启MSG_MORE, 提高发送性能 [AUTO-TRANSLATED:c0f4e378]
|
||||
// Close TCP no_delay and enable MSG_MORE to improve sending performance
|
||||
SockUtil::setNoDelay(_socket_rtp->rawFD(), false);
|
||||
_socket_rtp->setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
|
||||
} else if (_args.udp_rtcp_timeout) {
|
||||
createRtcpSocket();
|
||||
}
|
||||
// 连接建立成功事件 [AUTO-TRANSLATED:ac279c86]
|
||||
// Connection established successfully event
|
||||
weak_ptr<RtpSender> weak_self = shared_from_this();
|
||||
if (!_args.recv_stream_id.empty()) {
|
||||
mINI ini;
|
||||
ini[RtpSession::kStreamID] = _args.recv_stream_id;
|
||||
// 强制同步接收流和发送流的app和vhost [AUTO-TRANSLATED:134c9663]
|
||||
// Force synchronization of the app and vhost of the receive stream and send stream
|
||||
ini[RtpSession::kApp] = _args.recv_stream_app;
|
||||
ini[RtpSession::kVhost] = _args.recv_stream_vhost;
|
||||
_rtp_session = std::make_shared<RtpSession>(_socket_rtp);
|
||||
_rtp_session->setParams(ini);
|
||||
|
||||
_socket_rtp->setOnRead([weak_self](const Buffer::Ptr &buf, struct sockaddr *addr, int addr_len) {
|
||||
auto strong_self = weak_self.lock();
|
||||
if (!strong_self) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
strong_self->_rtp_session->onRecv(buf);
|
||||
} catch (std::exception &ex) {
|
||||
SockException err(toolkit::Err_shutdown, ex.what());
|
||||
strong_self->_rtp_session->shutdown(err);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
_socket_rtp->setOnRead(nullptr);
|
||||
}
|
||||
_socket_rtp->setOnErr([weak_self](const SockException &err) {
|
||||
auto strong_self = weak_self.lock();
|
||||
if (!strong_self) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
strong_self->_rtp_session->onRecv(buf);
|
||||
} catch (std::exception &ex) {
|
||||
SockException err(toolkit::Err_shutdown, ex.what());
|
||||
strong_self->_rtp_session->shutdown(err);
|
||||
if (strong_self) {
|
||||
strong_self->onErr(err);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
_socket_rtp->setOnRead(nullptr);
|
||||
}
|
||||
_socket_rtp->setOnErr([weak_self](const SockException &err) {
|
||||
auto strong_self = weak_self.lock();
|
||||
if (strong_self) {
|
||||
strong_self->onErr(err);
|
||||
}
|
||||
});
|
||||
InfoL << "startSend rtp success: " << _socket_rtp->get_peer_ip() << ":" << _socket_rtp->get_peer_port() << ", data_type: " << _args.data_type << ", con_type: " << _args.con_type;
|
||||
}
|
||||
|
||||
@@ -378,28 +400,51 @@ void RtpSender::onFlushRtpList(shared_ptr<List<Buffer::Ptr>> rtp_list) {
|
||||
return;
|
||||
}
|
||||
|
||||
size_t i = 0;
|
||||
auto size = rtp_list->size();
|
||||
rtp_list->for_each([&](Buffer::Ptr &packet) {
|
||||
switch (_args.con_type) {
|
||||
case MediaSourceEvent::SendRtpArgs::kUdpActive:
|
||||
case MediaSourceEvent::SendRtpArgs::kUdpPassive: {
|
||||
onSendRtpUdp(packet, i == 0);
|
||||
// udp模式,rtp over tcp前4个字节可以忽略 [AUTO-TRANSLATED:5d648f4b]
|
||||
// UDP mode, the first 4 bytes of rtp over tcp can be ignored
|
||||
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
|
||||
break;
|
||||
auto send_func = [this](const shared_ptr<List<Buffer::Ptr>> &rtp_list) {
|
||||
size_t i = 0;
|
||||
auto size = rtp_list->size();
|
||||
rtp_list->for_each([&](Buffer::Ptr &packet) {
|
||||
switch (_args.con_type) {
|
||||
case MediaSourceEvent::SendRtpArgs::kUdpActive:
|
||||
case MediaSourceEvent::SendRtpArgs::kUdpPassive: {
|
||||
onSendRtpUdp(packet, i == 0);
|
||||
// udp模式,rtp over tcp前4个字节可以忽略 [AUTO-TRANSLATED:5d648f4b]
|
||||
// UDP mode, the first 4 bytes of rtp over tcp can be ignored
|
||||
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
|
||||
break;
|
||||
}
|
||||
case MediaSourceEvent::SendRtpArgs::kTcpActive:
|
||||
case MediaSourceEvent::SendRtpArgs::kTcpPassive: {
|
||||
// tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 [AUTO-TRANSLATED:a3bc338a]
|
||||
// TCP mode, the first 2 bytes of rtp over tcp can be ignored, only the subsequent 2 bytes of rtp length are retained
|
||||
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
|
||||
break;
|
||||
}
|
||||
case MediaSourceEvent::SendRtpArgs::kVoiceTalk: {
|
||||
auto type = _socket_rtp->alive() ? _socket_rtp->sockType() : SockNum::Sock_Invalid;
|
||||
if (type == SockNum::Sock_UDP) {
|
||||
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), RtpPacket::kRtpTcpHeaderSize), nullptr, 0, ++i == size);
|
||||
} else if (type == SockNum::Sock_TCP) {
|
||||
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
|
||||
} else {
|
||||
onErr(SockException(Err_other, "dst socket disconnected"));
|
||||
}
|
||||
break;
|
||||
}
|
||||
default: CHECK(0);
|
||||
}
|
||||
case MediaSourceEvent::SendRtpArgs::kTcpActive:
|
||||
case MediaSourceEvent::SendRtpArgs::kTcpPassive: {
|
||||
// tcp模式, rtp over tcp前2个字节可以忽略,只保留后续rtp长度的2个字节 [AUTO-TRANSLATED:a3bc338a]
|
||||
// TCP mode, the first 2 bytes of rtp over tcp can be ignored, only the subsequent 2 bytes of rtp length are retained
|
||||
_socket_rtp->send(std::make_shared<BufferRtp>(std::move(packet), 2), nullptr, 0, ++i == size);
|
||||
break;
|
||||
});
|
||||
};
|
||||
if (_args.con_type != MediaSourceEvent::SendRtpArgs::kVoiceTalk) {
|
||||
weak_ptr<RtpSender> weak_self = shared_from_this();
|
||||
_socket_rtp->getPoller()->async([weak_self, rtp_list, send_func]() {
|
||||
if (auto strong_self = weak_self.lock()) {
|
||||
send_func(rtp_list);
|
||||
}
|
||||
default: CHECK(0);
|
||||
}
|
||||
});
|
||||
});
|
||||
} else {
|
||||
send_func(rtp_list);
|
||||
}
|
||||
}
|
||||
|
||||
void RtpSender::onErr(const SockException &ex) {
|
||||
|
||||
Reference in New Issue
Block a user