适配ZLToolKit代码,支持自定义创建Socket:#468

This commit is contained in:
xiongziliang
2020-09-12 19:03:52 +08:00
parent 9a088f4825
commit c711eedaa7
25 changed files with 331 additions and 278 deletions

View File

@@ -25,153 +25,177 @@ MultiCastAddressMaker &MultiCastAddressMaker::Instance() {
return instance;
}
static uint32_t addressToInt(const string &ip){
struct in_addr addr;
bzero(&addr,sizeof(addr));
addr.s_addr = inet_addr(ip.data());
return (uint32_t)ntohl((uint32_t &)addr.s_addr);
bool MultiCastAddressMaker::isMultiCastAddress(uint32_t addr) {
static uint32_t addrMin = mINI::Instance()[MultiCast::kAddrMin].as<uint32_t>();
static uint32_t addrMax = mINI::Instance()[MultiCast::kAddrMax].as<uint32_t>();
return addr >= addrMin && addr <= addrMax;
}
std::shared_ptr<uint32_t> MultiCastAddressMaker::obtain(uint32_t iTry) {
string MultiCastAddressMaker::toString(uint32_t addr) {
addr = htonl(addr);
return SockUtil::inet_ntoa((struct in_addr &) (addr));
}
static uint32_t addressToInt(const string &ip){
struct in_addr addr;
bzero(&addr, sizeof(addr));
addr.s_addr = inet_addr(ip.data());
return (uint32_t) ntohl((uint32_t &) addr.s_addr);
}
std::shared_ptr<uint32_t> MultiCastAddressMaker::obtain(uint32_t max_try) {
lock_guard<recursive_mutex> lck(_mtx);
GET_CONFIG(string,addrMinStr,MultiCast::kAddrMin);
GET_CONFIG(string,addrMaxStr,MultiCast::kAddrMax);
GET_CONFIG(string, addrMinStr, MultiCast::kAddrMin);
GET_CONFIG(string, addrMaxStr, MultiCast::kAddrMax);
uint32_t addrMin = addressToInt(addrMinStr);
uint32_t addrMax = addressToInt(addrMaxStr);
if(_iAddr > addrMax || _iAddr == 0){
_iAddr = addrMin;
if (_addr > addrMax || _addr == 0) {
_addr = addrMin;
}
auto iGotAddr = _iAddr++;
if(_setBadAddr.find(iGotAddr) != _setBadAddr.end()){
auto iGotAddr = _addr++;
if (_used_addr.find(iGotAddr) != _used_addr.end()) {
//已经分配过了
if(iTry){
return obtain(--iTry);
if (max_try) {
return obtain(--max_try);
}
//分配完了,应该不可能到这里
ErrorL;
return nullptr;
}
_setBadAddr.emplace(iGotAddr);
std::shared_ptr<uint32_t> ret(new uint32_t(iGotAddr),[](uint32_t *ptr){
_used_addr.emplace(iGotAddr);
std::shared_ptr<uint32_t> ret(new uint32_t(iGotAddr), [](uint32_t *ptr) {
MultiCastAddressMaker::Instance().release(*ptr);
delete ptr;
});
return ret;
}
void MultiCastAddressMaker::release(uint32_t iAddr){
void MultiCastAddressMaker::release(uint32_t addr){
lock_guard<recursive_mutex> lck(_mtx);
_setBadAddr.erase(iAddr);
_used_addr.erase(addr);
}
////////////////////////////////////////////////////////////////////////////////////
recursive_mutex RtpMultiCaster::g_mtx;
unordered_map<string, weak_ptr<RtpMultiCaster> > RtpMultiCaster::g_mapBroadCaster;
recursive_mutex g_mtx;
unordered_map<string, weak_ptr<RtpMultiCaster> > g_multi_caster_map;
void RtpMultiCaster::setDetachCB(void* listener, const onDetach& cb) {
lock_guard<recursive_mutex> lck(_mtx);
if(cb){
_mapDetach.emplace(listener,cb);
}else{
_mapDetach.erase(listener);
if (cb) {
_detach_map.emplace(listener, cb);
} else {
_detach_map.erase(listener);
}
}
RtpMultiCaster::~RtpMultiCaster() {
_pReader->setReadCB(nullptr);
_pReader->setDetachCB(nullptr);
_rtp_reader->setReadCB(nullptr);
_rtp_reader->setDetachCB(nullptr);
DebugL;
}
RtpMultiCaster::RtpMultiCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA,strVhost,strApp, strStream));
if(!src){
auto strErr = StrPrinter << "未找到媒体源:" << strVhost << " " << strApp << " " << strStream << endl;
throw std::runtime_error(strErr);
RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream) {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA, vhost, app, stream));
if (!src) {
auto err = StrPrinter << "未找到媒体源:" << vhost << " " << app << " " << stream << endl;
throw std::runtime_error(err);
}
_multicast_ip = MultiCastAddressMaker::Instance().obtain();
if (!_multicast_ip) {
throw std::runtime_error("获取组播地址失败");
}
_multiAddr = MultiCastAddressMaker::Instance().obtain();
for(auto i = 0; i < 2; i++){
_apUdpSock[i].reset(new Socket(poller));
if(!_apUdpSock[i]->bindUdpSock(0, strLocalIp.data())){
auto strErr = StrPrinter << "绑定UDP端口失败:" << strLocalIp << endl;
throw std::runtime_error(strErr);
}
auto fd = _apUdpSock[i]->rawFD();
GET_CONFIG(uint32_t,udpTTL,MultiCast::kUdpTTL);
for (auto i = 0; i < 2; ++i) {
//创建udp socket, 数组下标为TrackType
_udp_sock[i] = helper.createSocket();
if (!_udp_sock[i]->bindUdpSock(0, local_ip.data())) {
auto err = StrPrinter << "绑定UDP端口失败:" << local_ip << endl;
throw std::runtime_error(err);
}
auto fd = _udp_sock[i]->rawFD();
GET_CONFIG(uint32_t, udpTTL, MultiCast::kUdpTTL);
SockUtil::setMultiTTL(fd, udpTTL);
SockUtil::setMultiLOOP(fd, false);
SockUtil::setMultiIF(fd, strLocalIp.data());
SockUtil::setMultiIF(fd, local_ip.data());
struct sockaddr_in &peerAddr = _aPeerUdpAddr[i];
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(_apUdpSock[i]->get_local_port());
peerAddr.sin_addr.s_addr = htonl(*_multiAddr);
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
_apUdpSock[i]->setSendPeerAddr((struct sockaddr *)&peerAddr);
struct sockaddr_in peer;
peer.sin_family = AF_INET;
//组播目标端口为本地发送端口
peer.sin_port = htons(_udp_sock[i]->get_local_port());
//组播目标地址
peer.sin_addr.s_addr = htonl(*_multicast_ip);
bzero(&(peer.sin_zero), sizeof peer.sin_zero);
_udp_sock[i]->setSendPeerAddr((struct sockaddr *) &peer);
}
_pReader = src->getRing()->attach(poller);
_pReader->setReadCB([this](const RtspMediaSource::RingDataType &pkt){
_rtp_reader = src->getRing()->attach(helper.getPoller());
_rtp_reader->setReadCB([this](const RtspMediaSource::RingDataType &pkt) {
int i = 0;
int size = pkt->size();
pkt->for_each([&](const RtpPacket::Ptr &rtp) {
auto &pSock = _apUdpSock[rtp->type];
auto &peerAddr = _aPeerUdpAddr[rtp->type];
BufferRtp::Ptr buffer(new BufferRtp(rtp, 4));
pSock->send(buffer, nullptr, 0, ++i == size);
auto &sock = _udp_sock[rtp->type];
sock->send(std::make_shared<BufferRtp>(rtp, 4), nullptr, 0, ++i == size);
});
});
_pReader->setDetachCB([this](){
unordered_map<void * , onDetach > _mapDetach_copy;
_rtp_reader->setDetachCB([this]() {
unordered_map<void *, onDetach> _detach_map_copy;
{
lock_guard<recursive_mutex> lck(_mtx);
_mapDetach_copy = std::move(_mapDetach);
_detach_map_copy = std::move(_detach_map);
}
for(auto &pr : _mapDetach_copy){
for (auto &pr : _detach_map_copy) {
pr.second();
}
});
DebugL << MultiCastAddressMaker::toString(*_multiAddr) << " "
<< _apUdpSock[0]->get_local_port() << " "
<< _apUdpSock[1]->get_local_port() << " "
<< strVhost << " "
<< strApp << " " << strStream;
}
uint16_t RtpMultiCaster::getPort(TrackType trackType){
return _apUdpSock[trackType]->get_local_port();
}
string RtpMultiCaster::getIP(){
return SockUtil::inet_ntoa(_aPeerUdpAddr[0].sin_addr);
}
RtpMultiCaster::Ptr RtpMultiCaster::make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream){
try{
auto ret = Ptr(new RtpMultiCaster(poller,strLocalIp,strVhost,strApp,strStream),[poller](RtpMultiCaster *ptr){
poller->async([ptr]() {
delete ptr;
});
});
lock_guard<recursive_mutex> lck(g_mtx);
string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl;
weak_ptr<RtpMultiCaster> weakPtr = ret;
g_mapBroadCaster.emplace(strKey,weakPtr);
return ret;
}catch (std::exception &ex) {
WarnL << ex.what();
return nullptr;
}
DebugL << MultiCastAddressMaker::toString(*_multicast_ip) << " "
<< _udp_sock[0]->get_local_port() << " "
<< _udp_sock[1]->get_local_port() << " "
<< vhost << " " << app << " " << stream;
}
RtpMultiCaster::Ptr RtpMultiCaster::get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) {
string strKey = StrPrinter << strLocalIp << " " << strVhost << " " << strApp << " " << strStream << endl;
uint16_t RtpMultiCaster::getMultiCasterPort(TrackType trackType) {
return _udp_sock[trackType]->get_local_port();
}
string RtpMultiCaster::getMultiCasterIP() {
struct in_addr addr;
addr.s_addr = htonl(*_multicast_ip);
return SockUtil::inet_ntoa(addr);
}
RtpMultiCaster::Ptr RtpMultiCaster::get(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream) {
static auto on_create = [](SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream){
try {
auto poller = helper.getPoller();
auto ret = RtpMultiCaster::Ptr(new RtpMultiCaster(helper, local_ip, vhost, app, stream), [poller](RtpMultiCaster *ptr) {
poller->async([ptr]() {
delete ptr;
});
});
lock_guard<recursive_mutex> lck(g_mtx);
string strKey = StrPrinter << local_ip << " " << vhost << " " << app << " " << stream << endl;
g_multi_caster_map.emplace(strKey, ret);
return ret;
} catch (std::exception &ex) {
WarnL << ex.what();
return RtpMultiCaster::Ptr();
}
};
string strKey = StrPrinter << local_ip << " " << vhost << " " << app << " " << stream << endl;
lock_guard<recursive_mutex> lck(g_mtx);
auto it = g_mapBroadCaster.find(strKey);
if (it == g_mapBroadCaster.end()) {
return make(poller,strLocalIp,strVhost,strApp, strStream);
auto it = g_multi_caster_map.find(strKey);
if (it == g_multi_caster_map.end()) {
return on_create(helper, local_ip, vhost, app, stream);
}
auto ret = it->second.lock();
if (!ret) {
g_mapBroadCaster.erase(it);
return make(poller,strLocalIp,strVhost,strApp, strStream);
g_multi_caster_map.erase(it);
return on_create(helper, local_ip, vhost, app, stream);
}
return ret;
}

View File

@@ -11,7 +11,6 @@
#ifndef SRC_RTSP_RTPBROADCASTER_H_
#define SRC_RTSP_RTPBROADCASTER_H_
#include <mutex>
#include <memory>
#include <unordered_set>
@@ -20,60 +19,52 @@
#include "RtspMediaSource.h"
#include "Util/mini.h"
#include "Network/Socket.h"
using namespace std;
using namespace toolkit;
namespace mediakit{
class MultiCastAddressMaker
{
class MultiCastAddressMaker {
public:
static MultiCastAddressMaker &Instance();
~MultiCastAddressMaker() {}
static MultiCastAddressMaker& Instance();
static bool isMultiCastAddress(uint32_t addr);
static string toString(uint32_t addr);
std::shared_ptr<uint32_t> obtain(uint32_t max_try = 10);
static bool isMultiCastAddress(uint32_t iAddr){
static uint32_t addrMin = mINI::Instance()[MultiCast::kAddrMin].as<uint32_t>();
static uint32_t addrMax = mINI::Instance()[MultiCast::kAddrMax].as<uint32_t>();
return iAddr >= addrMin && iAddr <= addrMax;
}
static string toString(uint32_t iAddr){
iAddr = htonl(iAddr);
return SockUtil::inet_ntoa((struct in_addr &)(iAddr));
}
virtual ~MultiCastAddressMaker(){}
std::shared_ptr<uint32_t> obtain(uint32_t iTry = 10);
private:
MultiCastAddressMaker(){};
void release(uint32_t iAddr);
uint32_t _iAddr = 0;
MultiCastAddressMaker() {};
void release(uint32_t addr);
private:
uint32_t _addr = 0;
recursive_mutex _mtx;
unordered_set<uint32_t> _setBadAddr;
unordered_set<uint32_t> _used_addr;
};
class RtpMultiCaster {
public:
typedef std::shared_ptr<RtpMultiCaster> Ptr;
typedef function<void()> onDetach;
virtual ~RtpMultiCaster();
static Ptr get(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);
~RtpMultiCaster();
static Ptr get(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream);
void setDetachCB(void *listener,const onDetach &cb);
uint16_t getPort(TrackType trackType);
string getIP();
string getMultiCasterIP();
uint16_t getMultiCasterPort(TrackType trackType);
private:
static recursive_mutex g_mtx;
static unordered_map<string , weak_ptr<RtpMultiCaster> > g_mapBroadCaster;
static Ptr make(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);
RtpMultiCaster(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream);
std::shared_ptr<uint32_t> _multiAddr;
private:
recursive_mutex _mtx;
unordered_map<void * , onDetach > _mapDetach;
RtspMediaSource::RingType::RingReader::Ptr _pReader;
Socket::Ptr _apUdpSock[2];
struct sockaddr_in _aPeerUdpAddr[2];
RtpMultiCaster(const EventPoller::Ptr &poller,const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);
Socket::Ptr _udp_sock[2];
std::shared_ptr<uint32_t> _multicast_ip;
unordered_map<void * , onDetach > _detach_map;
RtspMediaSource::RingType::RingReader::Ptr _rtp_reader;
};
}//namespace mediakit
#endif /* SRC_RTSP_RTPBROADCASTER_H_ */

View File

@@ -365,8 +365,10 @@ bool RtspUrl::setup(bool isSSL, const string &strUrl, const string &strUser, con
return true;
}
std::pair<Socket::Ptr, Socket::Ptr> makeSockPair_l(const EventPoller::Ptr &poller, const string &local_ip){
auto pSockRtp = std::make_shared<Socket>(poller);
static void makeSockPair_l(std::pair<Socket::Ptr, Socket::Ptr> &pair, const string &local_ip){
auto &pSockRtp = pair.first;
auto &pSockRtcp = pair.second;
if (!pSockRtp->bindUdpSock(0, local_ip.data())) {
//分配端口失败
throw runtime_error("open udp socket failed");
@@ -374,7 +376,6 @@ std::pair<Socket::Ptr, Socket::Ptr> makeSockPair_l(const EventPoller::Ptr &polle
//是否是偶数
bool even_numbers = pSockRtp->get_local_port() % 2 == 0;
auto pSockRtcp = std::make_shared<Socket>(poller);
if (!pSockRtcp->bindUdpSock(pSockRtp->get_local_port() + (even_numbers ? 1 : -1), local_ip.data())) {
//分配端口失败
throw runtime_error("open udp socket failed");
@@ -386,22 +387,21 @@ std::pair<Socket::Ptr, Socket::Ptr> makeSockPair_l(const EventPoller::Ptr &polle
pSockRtp = pSockRtcp;
pSockRtcp = tmp;
}
return std::make_pair(pSockRtp, pSockRtcp);
}
std::pair<Socket::Ptr, Socket::Ptr> makeSockPair(const EventPoller::Ptr &poller, const string &local_ip){
int try_count = 0;
while (true) {
try {
return makeSockPair_l(poller, local_ip);
} catch (...) {
if (++try_count == 3) {
throw;
}
WarnL << "open udp socket failed, retry: " << try_count;
}
}
void makeSockPair(std::pair<Socket::Ptr, Socket::Ptr> &pair, const string &local_ip){
int try_count = 0;
while (true) {
try {
makeSockPair_l(pair, local_ip);
break;
} catch (...) {
if (++try_count == 3) {
throw;
}
WarnL << "open udp socket failed, retry: " << try_count;
}
}
}
string printSSRC(uint32_t ui32Ssrc) {

View File

@@ -271,7 +271,7 @@ private:
_StrPrinter _printer;
};
std::pair<Socket::Ptr, Socket::Ptr> makeSockPair(const EventPoller::Ptr &poller, const string &local_ip);
void makeSockPair(std::pair<Socket::Ptr, Socket::Ptr> &pair, const string &local_ip);
string printSSRC(uint32_t ui32Ssrc);
} //namespace mediakit

View File

@@ -211,7 +211,8 @@ void RtspPlayer::createUdpSockIfNecessary(int track_idx){
auto &rtpSockRef = _rtp_sock[track_idx];
auto &rtcpSockRef = _rtcp_sock[track_idx];
if (!rtpSockRef || !rtcpSockRef) {
auto pr = makeSockPair(getPoller(), get_local_ip());
std::pair<Socket::Ptr, Socket::Ptr> pr = std::make_pair(createSocket(), createSocket());
makeSockPair(pr, get_local_ip());
rtpSockRef = pr.first;
rtcpSockRef = pr.second;
}
@@ -280,7 +281,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int track_idx) {
if (_rtp_type == Rtsp::RTP_MULTICAST) {
//udp组播
auto multiAddr = FindField((strTransport + ";").data(), "destination=", ";");
pRtpSockRef.reset(new Socket(getPoller()));
pRtpSockRef = createSocket();
if (!pRtpSockRef->bindUdpSock(rtp_port, multiAddr.data())) {
pRtpSockRef.reset();
throw std::runtime_error("open udp sock err");

View File

@@ -122,7 +122,7 @@ void RtspPusher::onConnect(const SockException &err) {
return;
}
//推流器不需要多大的接收缓存,节省内存占用
_sock->setReadBuffer(std::make_shared<BufferRaw>(1 * 1024));
getSock()->setReadBuffer(std::make_shared<BufferRaw>(1 * 1024));
sendAnnounce();
}
@@ -228,7 +228,7 @@ bool RtspPusher::handleAuthenticationFailure(const string &params_str) {
void RtspPusher::createUdpSockIfNecessary(int track_idx){
auto &rtp_sock = _udp_socks[track_idx];
if (!rtp_sock) {
rtp_sock.reset(new Socket(getPoller()));
rtp_sock = createSocket();
//rtp随机端口
if (!rtp_sock->bindUdpSock(0, get_local_ip().data())) {
rtp_sock.reset();
@@ -400,7 +400,7 @@ void RtspPusher::setSocketFlags(){
if (merge_write_ms > 0) {
//提高发送性能
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
SockUtil::setNoDelay(_sock->rawFD(), false);
SockUtil::setNoDelay(getSock()->rawFD(), false);
}
}

View File

@@ -277,7 +277,7 @@ void RtspSession::handleReq_RECORD(const Parser &parser){
sendRtspResponse("200 OK", {"RTP-Info",rtp_info});
if(_rtp_type == Rtsp::RTP_TCP){
//如果是rtsp推流服务器并且是TCP推流那么加大TCP接收缓存这样能提升接收性能
_sock->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
getSock()->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
}
};
@@ -667,10 +667,10 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
break;
case Rtsp::RTP_UDP: {
std::pair<Socket::Ptr, Socket::Ptr> pr;
try{
pr = makeSockPair(_sock->getPoller(), get_local_ip());
}catch(std::exception &ex) {
std::pair<Socket::Ptr, Socket::Ptr> pr = std::make_pair(createSocket(),createSocket());
try {
makeSockPair(pr, get_local_ip());
} catch (std::exception &ex) {
//分配端口失败
send_NotAcceptable();
throw SockException(Err_shutdown, ex.what());
@@ -681,8 +681,8 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
//设置客户端内网端口信息
string strClientPort = FindField(parser["Transport"].data(), "client_port=", NULL);
uint16_t ui16RtpPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
uint16_t ui16RtcpPort = atoi( FindField(strClientPort.data(), "-" , NULL).data());
uint16_t ui16RtpPort = atoi(FindField(strClientPort.data(), NULL, "-").data());
uint16_t ui16RtcpPort = atoi(FindField(strClientPort.data(), "-", NULL).data());
struct sockaddr_in peerAddr;
//设置rtp发送目标地址
@@ -690,14 +690,14 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
peerAddr.sin_port = htons(ui16RtpPort);
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
pr.first->setSendPeerAddr((struct sockaddr *)(&peerAddr));
pr.first->setSendPeerAddr((struct sockaddr *) (&peerAddr));
//设置rtcp发送目标地址
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(ui16RtcpPort);
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
pr.second->setSendPeerAddr((struct sockaddr *)(&peerAddr));
pr.second->setSendPeerAddr((struct sockaddr *) (&peerAddr));
//尝试获取客户端nat映射地址
startListenPeerUdpData(trackIdx);
@@ -714,7 +714,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
break;
case Rtsp::RTP_MULTICAST: {
if(!_multicaster){
_multicaster = RtpMultiCaster::get(getPoller(), get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid);
_multicaster = RtpMultiCaster::get(*this, get_local_ip(), _media_info._vhost, _media_info._app, _media_info._streamid);
if (!_multicaster) {
send_NotAcceptable();
throw SockException(Err_shutdown, "can not get a available udp multicast socket");
@@ -728,10 +728,10 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
strongSelf->safeShutdown(SockException(Err_shutdown,"ring buffer detached"));
});
}
int iSrvPort = _multicaster->getPort(trackRef->_type);
int iSrvPort = _multicaster->getMultiCasterPort(trackRef->_type);
//我们用trackIdx区分rtp和rtcp包
//由于组播udp端口是共享的而rtcp端口为组播udp端口+1所以rtcp端口需要改成共享端口
auto pSockRtcp = UDPServer::Instance().getSock(getPoller(),get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
auto pSockRtcp = UDPServer::Instance().getSock(*this, get_local_ip().data(), 2 * trackIdx + 1, iSrvPort + 1);
if (!pSockRtcp) {
//分配端口失败
send_NotAcceptable();
@@ -742,7 +742,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
sendRtspResponse("200 OK",
{"Transport", StrPrinter << "RTP/AVP;multicast;"
<< "destination=" << _multicaster->getIP() << ";"
<< "destination=" << _multicaster->getMultiCasterIP() << ";"
<< "source=" << get_local_ip() << ";"
<< "port=" << iSrvPort << "-" << pSockRtcp->get_local_port() << ";"
<< "ttl=" << udpTTL << ";"
@@ -1230,7 +1230,7 @@ void RtspSession::setSocketFlags(){
GET_CONFIG(int, mergeWriteMS, General::kMergeWriteMS);
if(mergeWriteMS > 0) {
//推流模式下关闭TCP_NODELAY会增加推流端的延时但是服务器性能将提高
SockUtil::setNoDelay(_sock->rawFD(), false);
SockUtil::setNoDelay(getSock()->rawFD(), false);
//播放模式下开启MSG_MORE会增加延时但是能提高发送性能
setSendFlags(SOCKET_DEFAULE_FLAGS | FLAG_MORE);
}

View File

@@ -25,72 +25,71 @@ UDPServer::~UDPServer() {
InfoL;
}
Socket::Ptr UDPServer::getSock(const EventPoller::Ptr &poller,const char* strLocalIp, int intervaled,uint16_t iLocalPort) {
lock_guard<mutex> lck(_mtxUpdSock);
string strKey = StrPrinter << strLocalIp << ":" << intervaled << endl;
auto it = _mapUpdSock.find(strKey);
if (it == _mapUpdSock.end()) {
Socket::Ptr pSock(new Socket(poller));
//InfoL<<localIp;
if (!pSock->bindUdpSock(iLocalPort, strLocalIp)) {
Socket::Ptr UDPServer::getSock(SocketHelper &helper, const char* local_ip, int interleaved, uint16_t local_port) {
lock_guard<mutex> lck(_mtx_udp_sock);
string key = StrPrinter << local_ip << ":" << interleaved << endl;
auto it = _udp_sock_map.find(key);
if (it == _udp_sock_map.end()) {
Socket::Ptr sock = helper.createSocket();
if (!sock->bindUdpSock(local_port, local_ip)) {
//分配失败
return nullptr;
}
pSock->setOnRead(bind(&UDPServer::onRcvData, this, intervaled, placeholders::_1,placeholders::_2));
pSock->setOnErr(bind(&UDPServer::onErr, this, strKey, placeholders::_1));
_mapUpdSock[strKey] = pSock;
DebugL << strLocalIp << " " << pSock->get_local_port() << " " << intervaled;
return pSock;
sock->setOnErr(bind(&UDPServer::onErr, this, key, placeholders::_1));
sock->setOnRead(bind(&UDPServer::onRecv, this, interleaved, placeholders::_1, placeholders::_2));
_udp_sock_map[key] = sock;
DebugL << local_ip << " " << sock->get_local_port() << " " << interleaved;
return sock;
}
return it->second;
}
void UDPServer::listenPeer(const char* strPeerIp, void* pSelf, const onRecvData& cb) {
lock_guard<mutex> lck(_mtxDataHandler);
auto &mapRef = _mapDataHandler[strPeerIp];
mapRef.emplace(pSelf, cb);
void UDPServer::listenPeer(const char* peer_ip, void* obj, const onRecvData &cb) {
lock_guard<mutex> lck(_mtx_on_recv);
auto &ref = _on_recv_map[peer_ip];
ref.emplace(obj, cb);
}
void UDPServer::stopListenPeer(const char* strPeerIp, void* pSelf) {
lock_guard<mutex> lck(_mtxDataHandler);
auto it0 = _mapDataHandler.find(strPeerIp);
if (it0 == _mapDataHandler.end()) {
void UDPServer::stopListenPeer(const char* peer_ip, void* obj) {
lock_guard<mutex> lck(_mtx_on_recv);
auto it0 = _on_recv_map.find(peer_ip);
if (it0 == _on_recv_map.end()) {
return;
}
auto &mapRef = it0->second;
auto it1 = mapRef.find(pSelf);
if (it1 != mapRef.end()) {
mapRef.erase(it1);
auto &ref = it0->second;
auto it1 = ref.find(obj);
if (it1 != ref.end()) {
ref.erase(it1);
}
if (mapRef.size() == 0) {
_mapDataHandler.erase(it0);
if (ref.size() == 0) {
_on_recv_map.erase(it0);
}
}
void UDPServer::onErr(const string& strKey, const SockException& err) {
void UDPServer::onErr(const string &key, const SockException &err) {
WarnL << err.what();
lock_guard<mutex> lck(_mtxUpdSock);
_mapUpdSock.erase(strKey);
lock_guard<mutex> lck(_mtx_udp_sock);
_udp_sock_map.erase(key);
}
void UDPServer::onRcvData(int intervaled, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) {
//TraceL << trackIndex;
struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr;
string peerIp = SockUtil::inet_ntoa(in->sin_addr);
lock_guard<mutex> lck(_mtxDataHandler);
auto it0 = _mapDataHandler.find(peerIp);
if (it0 == _mapDataHandler.end()) {
void UDPServer::onRecv(int interleaved, const Buffer::Ptr &buf, struct sockaddr* peer_addr) {
struct sockaddr_in *in = (struct sockaddr_in *) peer_addr;
string peer_ip = SockUtil::inet_ntoa(in->sin_addr);
lock_guard<mutex> lck(_mtx_on_recv);
auto it0 = _on_recv_map.find(peer_ip);
if (it0 == _on_recv_map.end()) {
return;
}
auto &mapRef = it0->second;
for (auto it1 = mapRef.begin(); it1 != mapRef.end(); ++it1) {
onRecvData &funRef = it1->second;
if (!funRef(intervaled, pBuf, pPeerAddr)) {
it1 = mapRef.erase(it1);
auto &ref = it0->second;
for (auto it1 = ref.begin(); it1 != ref.end(); ++it1) {
auto &func = it1->second;
if (!func(interleaved, buf, peer_addr)) {
it1 = ref.erase(it1);
}
}
if (mapRef.size() == 0) {
_mapDataHandler.erase(it0);
if (ref.size() == 0) {
_on_recv_map.erase(it0);
}
}

View File

@@ -30,18 +30,20 @@ public:
typedef function< bool(int intervaled, const Buffer::Ptr &buffer, struct sockaddr *peer_addr)> onRecvData;
~UDPServer();
static UDPServer &Instance();
Socket::Ptr getSock(const EventPoller::Ptr &poller,const char *strLocalIp, int intervaled,uint16_t iLocalPort = 0);
void listenPeer(const char *strPeerIp, void *pSelf, const onRecvData &cb);
void stopListenPeer(const char *strPeerIp, void *pSelf);
Socket::Ptr getSock(SocketHelper &helper, const char *local_ip, int interleaved, uint16_t local_port = 0);
void listenPeer(const char *peer_ip, void *obj, const onRecvData &cb);
void stopListenPeer(const char *peer_ip, void *obj);
private:
UDPServer();
void onRcvData(int intervaled, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr);
void onRecv(int interleaved, const Buffer::Ptr &buf, struct sockaddr *peer_addr);
void onErr(const string &strKey,const SockException &err);
unordered_map<string, Socket::Ptr> _mapUpdSock;
mutex _mtxUpdSock;
unordered_map<string, unordered_map<void *, onRecvData> > _mapDataHandler;
mutex _mtxDataHandler;
private:
mutex _mtx_udp_sock;
mutex _mtx_on_recv;
unordered_map<string, Socket::Ptr> _udp_sock_map;
unordered_map<string, unordered_map<void *, onRecvData> > _on_recv_map;
};
} /* namespace mediakit */