统一成员变量命名风格

This commit is contained in:
xiongziliang
2018-10-24 15:43:52 +08:00
parent 97567ec36d
commit 39baaebc55
74 changed files with 2048 additions and 2048 deletions

View File

@@ -50,17 +50,17 @@ static uint32_t addressToInt(const string &ip){
}
std::shared_ptr<uint32_t> MultiCastAddressMaker::obtain(uint32_t iTry) {
lock_guard<recursive_mutex> lck(m_mtx);
lock_guard<recursive_mutex> lck(_mtx);
GET_CONFIG_AND_REGISTER(string,addrMinStr,Config::MultiCast::kAddrMin);
GET_CONFIG_AND_REGISTER(string,addrMaxStr,Config::MultiCast::kAddrMax);
uint32_t addrMin = addressToInt(addrMinStr);
uint32_t addrMax = addressToInt(addrMaxStr);
if(m_iAddr > addrMax || m_iAddr == 0){
m_iAddr = addrMin;
if(_iAddr > addrMax || _iAddr == 0){
_iAddr = addrMin;
}
auto iGotAddr = m_iAddr++;
if(m_setBadAddr.find(iGotAddr) != m_setBadAddr.end()){
auto iGotAddr = _iAddr++;
if(_setBadAddr.find(iGotAddr) != _setBadAddr.end()){
//已经分配过了
if(iTry){
return obtain(--iTry);
@@ -69,7 +69,7 @@ std::shared_ptr<uint32_t> MultiCastAddressMaker::obtain(uint32_t iTry) {
ErrorL;
return nullptr;
}
m_setBadAddr.emplace(iGotAddr);
_setBadAddr.emplace(iGotAddr);
std::shared_ptr<uint32_t> ret(new uint32_t(iGotAddr),[](uint32_t *ptr){
MultiCastAddressMaker::Instance().release(*ptr);
delete ptr;
@@ -77,8 +77,8 @@ std::shared_ptr<uint32_t> MultiCastAddressMaker::obtain(uint32_t iTry) {
return ret;
}
void MultiCastAddressMaker::release(uint32_t iAddr){
lock_guard<recursive_mutex> lck(m_mtx);
m_setBadAddr.erase(iAddr);
lock_guard<recursive_mutex> lck(_mtx);
_setBadAddr.erase(iAddr);
}
@@ -86,16 +86,16 @@ recursive_mutex RtpBroadCaster::g_mtx;
unordered_map<string, weak_ptr<RtpBroadCaster> > RtpBroadCaster::g_mapBroadCaster;
void RtpBroadCaster::setDetachCB(void* listener, const onDetach& cb) {
lock_guard<recursive_mutex> lck(m_mtx);
lock_guard<recursive_mutex> lck(_mtx);
if(cb){
m_mapDetach.emplace(listener,cb);
_mapDetach.emplace(listener,cb);
}else{
m_mapDetach.erase(listener);
_mapDetach.erase(listener);
}
}
RtpBroadCaster::~RtpBroadCaster() {
m_pReader->setReadCB(nullptr);
m_pReader->setDetachCB(nullptr);
_pReader->setReadCB(nullptr);
_pReader->setDetachCB(nullptr);
DebugL;
}
RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream) {
@@ -104,55 +104,55 @@ RtpBroadCaster::RtpBroadCaster(const string &strLocalIp,const string &strVhost,c
auto strErr = StrPrinter << "未找到媒体源:" << strVhost << " " << strApp << " " << strStream << endl;
throw std::runtime_error(strErr);
}
m_multiAddr = MultiCastAddressMaker::Instance().obtain();
_multiAddr = MultiCastAddressMaker::Instance().obtain();
for(auto i = 0; i < 2; i++){
m_apUdpSock[i].reset(new Socket());
if(!m_apUdpSock[i]->bindUdpSock(0, strLocalIp.data())){
_apUdpSock[i].reset(new Socket());
if(!_apUdpSock[i]->bindUdpSock(0, strLocalIp.data())){
auto strErr = StrPrinter << "绑定UDP端口失败:" << strLocalIp << endl;
throw std::runtime_error(strErr);
}
auto fd = m_apUdpSock[i]->rawFD();
auto fd = _apUdpSock[i]->rawFD();
GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,Config::MultiCast::kUdpTTL);
SockUtil::setMultiTTL(fd, udpTTL);
SockUtil::setMultiLOOP(fd, false);
SockUtil::setMultiIF(fd, strLocalIp.data());
struct sockaddr_in &peerAddr = m_aPeerUdpAddr[i];
struct sockaddr_in &peerAddr = _aPeerUdpAddr[i];
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(m_apUdpSock[i]->get_local_port());
peerAddr.sin_addr.s_addr = htonl(*m_multiAddr);
peerAddr.sin_port = htons(_apUdpSock[i]->get_local_port());
peerAddr.sin_addr.s_addr = htonl(*_multiAddr);
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
}
m_pReader = src->getRing()->attach();
m_pReader->setReadCB([this](const RtpPacket::Ptr &pkt){
_pReader = src->getRing()->attach();
_pReader->setReadCB([this](const RtpPacket::Ptr &pkt){
int i = (int)(pkt->type);
auto &pSock = m_apUdpSock[i];
auto &peerAddr = m_aPeerUdpAddr[i];
auto &pSock = _apUdpSock[i];
auto &peerAddr = _aPeerUdpAddr[i];
BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
pSock->send(buffer,SOCKET_DEFAULE_FLAGS | FLAG_MORE,(struct sockaddr *)(&peerAddr));
});
m_pReader->setDetachCB([this](){
unordered_map<void * , onDetach > m_mapDetach_copy;
_pReader->setDetachCB([this](){
unordered_map<void * , onDetach > _mapDetach_copy;
{
lock_guard<recursive_mutex> lck(m_mtx);
m_mapDetach_copy = std::move(m_mapDetach);
lock_guard<recursive_mutex> lck(_mtx);
_mapDetach_copy = std::move(_mapDetach);
}
for(auto &pr : m_mapDetach_copy){
for(auto &pr : _mapDetach_copy){
pr.second();
}
});
DebugL << MultiCastAddressMaker::toString(*m_multiAddr) << " "
<< m_apUdpSock[0]->get_local_port() << " "
<< m_apUdpSock[1]->get_local_port() << " "
DebugL << MultiCastAddressMaker::toString(*_multiAddr) << " "
<< _apUdpSock[0]->get_local_port() << " "
<< _apUdpSock[1]->get_local_port() << " "
<< strVhost << " "
<< strApp << " " << strStream;
}
uint16_t RtpBroadCaster::getPort(TrackType trackType){
return m_apUdpSock[trackType]->get_local_port();
return _apUdpSock[trackType]->get_local_port();
}
string RtpBroadCaster::getIP(){
return inet_ntoa(m_aPeerUdpAddr[0].sin_addr);
return inet_ntoa(_aPeerUdpAddr[0].sin_addr);
}
RtpBroadCaster::Ptr RtpBroadCaster::make(const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream){
try{

View File

@@ -64,9 +64,9 @@ public:
private:
MultiCastAddressMaker(){};
void release(uint32_t iAddr);
uint32_t m_iAddr = 0;
recursive_mutex m_mtx;
unordered_set<uint32_t> m_setBadAddr;
uint32_t _iAddr = 0;
recursive_mutex _mtx;
unordered_set<uint32_t> _setBadAddr;
};
class RtpBroadCaster {
public:
@@ -82,12 +82,12 @@ private:
static unordered_map<string , weak_ptr<RtpBroadCaster> > g_mapBroadCaster;
static Ptr make(const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);
std::shared_ptr<uint32_t> m_multiAddr;
recursive_mutex m_mtx;
unordered_map<void * , onDetach > m_mapDetach;
RtspMediaSource::RingType::RingReader::Ptr m_pReader;
Socket::Ptr m_apUdpSock[2];
struct sockaddr_in m_aPeerUdpAddr[2];
std::shared_ptr<uint32_t> _multiAddr;
recursive_mutex _mtx;
unordered_map<void * , onDetach > _mapDetach;
RtspMediaSource::RingType::RingReader::Ptr _pReader;
Socket::Ptr _apUdpSock[2];
struct sockaddr_in _aPeerUdpAddr[2];
RtpBroadCaster(const string &strLocalIp,const string &strVhost,const string &strApp,const string &strStream);

View File

@@ -68,7 +68,7 @@ RtpParser::RtpParser(const string& sdp) {
break;
}
}
m_fDuration = getTimeInSDP(sdp);
_fDuration = getTimeInSDP(sdp);
}
bool RtpParser::inputRtp(const RtpPacket::Ptr & rtp) {

View File

@@ -51,7 +51,7 @@ public:
bool inputRtp(const RtpPacket::Ptr &rtp);
float getDuration() const override {
return m_fDuration;
return _fDuration;
}
/**
@@ -76,7 +76,7 @@ private:
inline void onGetAudioTrack(const RtspTrack &audio);
inline void onGetVideoTrack(const RtspTrack &video);
private:
float m_fDuration = 0;
float _fDuration = 0;
AudioTrack::Ptr _audioTrack;
VideoTrack::Ptr _videoTrack;
RtpCodec::Ptr _audioRtpDecoder;

View File

@@ -83,79 +83,79 @@ public:
break;
}
if (start == buf) {
m_strMethod = FindField(line.c_str(), NULL, " ");
m_strFullUrl = FindField(line.c_str(), " ", " ");
auto args_pos = m_strFullUrl.find('?');
_strMethod = FindField(line.c_str(), NULL, " ");
_strFullUrl = FindField(line.c_str(), " ", " ");
auto args_pos = _strFullUrl.find('?');
if(args_pos != string::npos){
m_strUrl = m_strFullUrl.substr(0,args_pos);
m_mapUrlArgs = parseArgs(m_strFullUrl.substr(args_pos + 1 ));
_strUrl = _strFullUrl.substr(0,args_pos);
_mapUrlArgs = parseArgs(_strFullUrl.substr(args_pos + 1 ));
}else{
m_strUrl = m_strFullUrl;
_strUrl = _strFullUrl;
}
m_strTail = FindField(line.c_str(), (m_strFullUrl + " ").c_str(), NULL);
_strTail = FindField(line.c_str(), (_strFullUrl + " ").c_str(), NULL);
} else {
auto field = FindField(line.c_str(), NULL, ": ");
auto value = FindField(line.c_str(), ": ", NULL);
if (field.size() != 0) {
m_mapHeaders[field] = value;
_mapHeaders[field] = value;
}
}
start = start + line.size() + 2;
if (strncmp(start, "\r\n", 2) == 0) { //协议解析完毕
m_strContent = FindField(start, "\r\n", NULL);
_strContent = FindField(start, "\r\n", NULL);
break;
}
}
}
const string& Method() const {
//rtsp方法
return m_strMethod;
return _strMethod;
}
const string& Url() const {
//rtsp url
return m_strUrl;
return _strUrl;
}
const string& FullUrl() const {
//rtsp url with args
return m_strFullUrl;
return _strFullUrl;
}
const string& Tail() const {
//RTSP/1.0
return m_strTail;
return _strTail;
}
const string& operator[](const char *name) const {
//rtsp field
auto it = m_mapHeaders.find(name);
if (it == m_mapHeaders.end()) {
return m_strNull;
auto it = _mapHeaders.find(name);
if (it == _mapHeaders.end()) {
return _strNull;
}
return it->second;
}
const string& Content() const {
return m_strContent;
return _strContent;
}
void Clear() {
m_strMethod.clear();
m_strUrl.clear();
m_strFullUrl.clear();
m_strTail.clear();
m_strContent.clear();
m_mapHeaders.clear();
m_mapUrlArgs.clear();
_strMethod.clear();
_strUrl.clear();
_strFullUrl.clear();
_strTail.clear();
_strContent.clear();
_mapHeaders.clear();
_mapUrlArgs.clear();
}
void setUrl(const string& url) {
this->m_strUrl = url;
this->_strUrl = url;
}
void setContent(const string& content) {
this->m_strContent = content;
this->_strContent = content;
}
StrCaseMap& getValues() const {
return m_mapHeaders;
return _mapHeaders;
}
StrCaseMap& getUrlArgs() const {
return m_mapUrlArgs;
return _mapUrlArgs;
}
static StrCaseMap parseArgs(const string &str,const char *pair_delim = "&", const char *key_delim = "="){
@@ -170,14 +170,14 @@ public:
}
private:
string m_strMethod;
string m_strUrl;
string m_strTail;
string m_strContent;
string m_strNull;
string m_strFullUrl;
mutable StrCaseMap m_mapHeaders;
mutable StrCaseMap m_mapUrlArgs;
string _strMethod;
string _strUrl;
string _strTail;
string _strContent;
string _strNull;
string _strFullUrl;
mutable StrCaseMap _mapHeaders;
mutable StrCaseMap _mapUrlArgs;
};
typedef struct {

View File

@@ -60,46 +60,46 @@ public:
RtspMediaSource(const string &strVhost,const string &strApp, const string &strId) :
MediaSource(RTSP_SCHEMA,strVhost,strApp,strId),
m_pRing(new RingBuffer<RtpPacket::Ptr>()) {
_pRing(new RingBuffer<RtpPacket::Ptr>()) {
}
virtual ~RtspMediaSource() {}
const RingType::Ptr &getRing() const {
//获取媒体源的rtp环形缓冲
return m_pRing;
return _pRing;
}
const string& getSdp() const {
//获取该源的媒体描述信息
return m_strSdp;
return _strSdp;
}
virtual uint32_t getSsrc(TrackType trackType) {
return m_mapTracks[trackType].ssrc;
return _mapTracks[trackType].ssrc;
}
virtual uint16_t getSeqence(TrackType trackType) {
return m_mapTracks[trackType].seq;
return _mapTracks[trackType].seq;
}
virtual uint32_t getTimestamp(TrackType trackType) {
return m_mapTracks[trackType].timeStamp;
return _mapTracks[trackType].timeStamp;
}
virtual void onGetSDP(const string& sdp) {
//派生类设置该媒体源媒体描述信息
m_strSdp = sdp;
_strSdp = sdp;
regist();
}
virtual void onGetRTP(const RtpPacket::Ptr &rtppt, bool keyPos) {
auto &trackRef = m_mapTracks[rtppt->type];
auto &trackRef = _mapTracks[rtppt->type];
trackRef.seq = rtppt->sequence;
trackRef.timeStamp = rtppt->timeStamp;
trackRef.ssrc = rtppt->ssrc;
trackRef.type = rtppt->type;
m_pRing->write(rtppt,keyPos);
_pRing->write(rtppt,keyPos);
}
protected:
unordered_map<int, RtspTrack> m_mapTracks;
string m_strSdp; //媒体描述信息
RingType::Ptr m_pRing; //rtp环形缓冲
unordered_map<int, RtspTrack> _mapTracks;
string _strSdp; //媒体描述信息
RingType::Ptr _pRing; //rtp环形缓冲
};
} /* namespace Rtsp */

View File

@@ -44,9 +44,9 @@ namespace ZL {
namespace Rtsp {
#define POP_HEAD(trackidx) \
auto it = m_amapRtpSort[trackidx].begin(); \
auto it = _amapRtpSort[trackidx].begin(); \
onRecvRTP_l(it->second, trackidx); \
m_amapRtpSort[trackidx].erase(it);
_amapRtpSort[trackidx].erase(it);
#define RTP_BUF_SIZE (4 * 1024)
@@ -54,48 +54,48 @@ const char kRtspMd5Nonce[] = "rtsp_md5_nonce";
const char kRtspRealm[] = "rtsp_realm";
RtspPlayer::RtspPlayer(void){
m_pktPool.setSize(64);
_pktPool.setSize(64);
}
RtspPlayer::~RtspPlayer(void) {
teardown();
if (m_pucRtpBuf) {
delete[] m_pucRtpBuf;
m_pucRtpBuf = nullptr;
if (_pucRtpBuf) {
delete[] _pucRtpBuf;
_pucRtpBuf = nullptr;
}
DebugL<<endl;
}
void RtspPlayer::teardown(){
if (alive()) {
sendRtspRequest("TEARDOWN" ,m_strContentBase);
sendRtspRequest("TEARDOWN" ,_strContentBase);
shutdown();
}
erase(kRtspMd5Nonce);
erase(kRtspRealm);
m_uiTrackCnt = 0;
m_onHandshake = nullptr;
m_uiRtpBufLen = 0;
m_strSession.clear();
m_uiCseq = 1;
m_strContentBase.clear();
CLEAR_ARR(m_apUdpSock);
CLEAR_ARR(m_aui16LastSeq)
CLEAR_ARR(m_aui16FirstSeq)
CLEAR_ARR(m_aui32SsrcErrorCnt)
CLEAR_ARR(m_aui64RtpRecv)
CLEAR_ARR(m_aui64SeqOkCnt)
CLEAR_ARR(m_abSortStarted)
CLEAR_ARR(m_aui64RtpRecv)
CLEAR_ARR(m_aui16NowSeq)
m_amapRtpSort[0].clear();
m_amapRtpSort[1].clear();
_uiTrackCnt = 0;
_onHandshake = nullptr;
_uiRtpBufLen = 0;
_strSession.clear();
_uiCseq = 1;
_strContentBase.clear();
CLEAR_ARR(_apUdpSock);
CLEAR_ARR(_aui16LastSeq)
CLEAR_ARR(_aui16FirstSeq)
CLEAR_ARR(_aui32SsrcErrorCnt)
CLEAR_ARR(_aui64RtpRecv)
CLEAR_ARR(_aui64SeqOkCnt)
CLEAR_ARR(_abSortStarted)
CLEAR_ARR(_aui64RtpRecv)
CLEAR_ARR(_aui16NowSeq)
_amapRtpSort[0].clear();
_amapRtpSort[1].clear();
m_pBeatTimer.reset();
m_pPlayTimer.reset();
m_pRtpTimer.reset();
m_fSeekTo = 0;
CLEAR_ARR(m_adFistStamp);
CLEAR_ARR(m_adNowStamp);
_pBeatTimer.reset();
_pPlayTimer.reset();
_pRtpTimer.reset();
_fSeekTo = 0;
CLEAR_ARR(_adFistStamp);
CLEAR_ARR(_adNowStamp);
}
void RtspPlayer::play(const char* strUrl){
@@ -131,9 +131,9 @@ void RtspPlayer::play(const char* strUrl, const char *strUser, const char *strPw
(*this)[kRtspPwdIsMD5] = false;
}
m_eType = eType;
if (m_eType == RTP_TCP && !m_pucRtpBuf) {
m_pucRtpBuf = new uint8_t[RTP_BUF_SIZE];
_eType = eType;
if (_eType == RTP_TCP && !_pucRtpBuf) {
_pucRtpBuf = new uint8_t[RTP_BUF_SIZE];
}
auto ip = FindField(strUrl, "://", "/");
if (!ip.size()) {
@@ -148,7 +148,7 @@ void RtspPlayer::play(const char* strUrl, const char *strUser, const char *strPw
ip = FindField(ip.c_str(), NULL, ":");
}
m_strUrl = strUrl;
_strUrl = strUrl;
if(!(*this)[PlayerBase::kNetAdapter].empty()){
setNetAdapter((*this)[PlayerBase::kNetAdapter]);
}
@@ -164,7 +164,7 @@ void RtspPlayer::onConnect(const SockException &err){
sendDescribe();
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
m_pPlayTimer.reset( new Timer(10, [weakSelf]() {
_pPlayTimer.reset( new Timer(10, [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
@@ -178,7 +178,7 @@ void RtspPlayer::onConnect(const SockException &err){
void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
const char *buf = pBuf->data();
int size = pBuf->size();
if (m_onHandshake) {
if (_onHandshake) {
//rtsp回复
int offset = 0;
while(offset < size - 4){
@@ -203,16 +203,16 @@ void RtspPlayer::onRecv(const Buffer::Ptr& pBuf) {
}
}
if (m_eType == RTP_TCP && m_pucRtpBuf) {
if (_eType == RTP_TCP && _pucRtpBuf) {
//RTP data
while (size > 0) {
int added = RTP_BUF_SIZE - m_uiRtpBufLen;
int added = RTP_BUF_SIZE - _uiRtpBufLen;
added = (added > size ? size : added);
memcpy(m_pucRtpBuf + m_uiRtpBufLen, buf, added);
m_uiRtpBufLen += added;
memcpy(_pucRtpBuf + _uiRtpBufLen, buf, added);
_uiRtpBufLen += added;
size -= added;
buf += added;
splitRtp(m_pucRtpBuf, m_uiRtpBufLen);
splitRtp(_pucRtpBuf, _uiRtpBufLen);
}
}
}
@@ -263,13 +263,13 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
StrPrinter << "DESCRIBE:" << parser.Url() << " " << parser.Tail() << endl);
}
auto strSdp = parser.Content();
m_strContentBase = parser["Content-Base"];
_strContentBase = parser["Content-Base"];
if(m_strContentBase.empty()){
m_strContentBase = m_strUrl;
if(_strContentBase.empty()){
_strContentBase = _strUrl;
}
if (m_strContentBase.back() == '/') {
m_strContentBase.pop_back();
if (_strContentBase.back() == '/') {
_strContentBase.pop_back();
}
auto iLen = atoi(parser["Content-Length"].data());
@@ -278,26 +278,26 @@ void RtspPlayer::handleResDESCRIBE(const Parser& parser) {
}
//解析sdp
m_uiTrackCnt = parserSDP(strSdp, m_aTrackInfo);
for (unsigned int i=0; i<m_uiTrackCnt; i++) {
m_aTrackInfo[i].ssrc=0;
m_aui32SsrcErrorCnt[i]=0;
_uiTrackCnt = parserSDP(strSdp, _aTrackInfo);
for (unsigned int i=0; i<_uiTrackCnt; i++) {
_aTrackInfo[i].ssrc=0;
_aui32SsrcErrorCnt[i]=0;
}
if (!m_uiTrackCnt) {
if (!_uiTrackCnt) {
throw std::runtime_error("解析SDP失败");
}
if (!onCheckSDP(strSdp, m_aTrackInfo, m_uiTrackCnt)) {
if (!onCheckSDP(strSdp, _aTrackInfo, _uiTrackCnt)) {
throw std::runtime_error("onCheckSDP faied");
}
sendSetup(0);
}
//发送SETUP命令
bool RtspPlayer::sendSetup(unsigned int trackIndex) {
m_onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex);
_onHandshake = std::bind(&RtspPlayer::handleResSETUP,this, placeholders::_1,trackIndex);
auto &track = m_aTrackInfo[trackIndex];
auto baseUrl = m_strContentBase + "/" + track.controlSuffix;
switch (m_eType) {
auto &track = _aTrackInfo[trackIndex];
auto baseUrl = _strContentBase + "/" + track.controlSuffix;
switch (_eType) {
case RTP_TCP: {
StrCaseMap header;
header["Transport"] = StrPrinter << "RTP/AVP/TCP;unicast;interleaved=" << track.type * 2 << "-" << track.type * 2 + 1;
@@ -311,12 +311,12 @@ bool RtspPlayer::sendSetup(unsigned int trackIndex) {
}
break;
case RTP_UDP: {
m_apUdpSock[trackIndex].reset(new Socket());
if (!m_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
m_apUdpSock[trackIndex].reset();
_apUdpSock[trackIndex].reset(new Socket());
if (!_apUdpSock[trackIndex]->bindUdpSock(0, get_local_ip().data())) {
_apUdpSock[trackIndex].reset();
throw std::runtime_error("open udp sock err");
}
int port = m_apUdpSock[trackIndex]->get_local_port();
int port = _apUdpSock[trackIndex]->get_local_port();
StrCaseMap header;
header["Transport"] = StrPrinter << "RTP/AVP;unicast;client_port=" << port << "-" << port + 1;
return sendRtspRequest("SETUP",baseUrl,header);
@@ -334,33 +334,33 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
StrPrinter << "SETUP:" << parser.Url() << " " << parser.Tail() << endl);
}
if (uiTrackIndex == 0) {
m_strSession = parser["Session"];
m_strSession.append(";");
m_strSession = FindField(m_strSession.data(), nullptr, ";");
_strSession = parser["Session"];
_strSession.append(";");
_strSession = FindField(_strSession.data(), nullptr, ";");
}
auto strTransport = parser["Transport"];
if(strTransport.find("TCP") != string::npos){
m_eType = RTP_TCP;
_eType = RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){
m_eType = RTP_MULTICAST;
_eType = RTP_MULTICAST;
}else{
m_eType = RTP_UDP;
_eType = RTP_UDP;
}
if(m_eType == RTP_TCP) {
if(_eType == RTP_TCP) {
string interleaved = FindField( FindField((strTransport + ";").c_str(), "interleaved=", ";").c_str(), NULL, "-");
m_aTrackInfo[uiTrackIndex].interleaved = atoi(interleaved.c_str());
_aTrackInfo[uiTrackIndex].interleaved = atoi(interleaved.c_str());
}else{
const char *strPos = (m_eType == RTP_MULTICAST ? "port=" : "server_port=") ;
const char *strPos = (_eType == RTP_MULTICAST ? "port=" : "server_port=") ;
auto port_str = FindField((strTransport + ";").c_str(), strPos, ";");
uint16_t port = atoi(FindField(port_str.c_str(), NULL, "-").c_str());
auto &pUdpSockRef = m_apUdpSock[uiTrackIndex];
auto &pUdpSockRef = _apUdpSock[uiTrackIndex];
if(!pUdpSockRef){
pUdpSockRef.reset(new Socket());
}
if (m_eType == RTP_MULTICAST) {
if (_eType == RTP_MULTICAST) {
auto multiAddr = FindField((strTransport + ";").c_str(), "destination=", ";");
if (!pUdpSockRef->bindUdpSock(port, "0.0.0.0")) {
pUdpSockRef.reset();
@@ -379,14 +379,14 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
}
}
if (uiTrackIndex < m_uiTrackCnt - 1) {
if (uiTrackIndex < _uiTrackCnt - 1) {
//需要继续发送SETUP命令
sendSetup(uiTrackIndex + 1);
return;
}
for (unsigned int i = 0; i < m_uiTrackCnt && m_eType != RTP_TCP; i++) {
auto &pUdpSockRef = m_apUdpSock[i];
for (unsigned int i = 0; i < _uiTrackCnt && _eType != RTP_TCP; i++) {
auto &pUdpSockRef = _apUdpSock[i];
if(!pUdpSockRef){
continue;
}
@@ -406,7 +406,7 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
}
/////////////////////////心跳/////////////////////////////////
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
m_pBeatTimer.reset(new Timer(5, [weakSelf](){
_pBeatTimer.reset(new Timer(5, [weakSelf](){
auto strongSelf = weakSelf.lock();
if (!strongSelf){
return false;
@@ -417,47 +417,47 @@ void RtspPlayer::handleResSETUP(const Parser &parser, unsigned int uiTrackIndex)
}
bool RtspPlayer::sendOptions() {
m_onHandshake = [](const Parser& parser){
_onHandshake = [](const Parser& parser){
return true;
};
return sendRtspRequest("OPTIONS",m_strContentBase);
return sendRtspRequest("OPTIONS",_strContentBase);
}
bool RtspPlayer::sendDescribe() {
//发送DESCRIBE命令后处理函数:handleResDESCRIBE
m_onHandshake = std::bind(&RtspPlayer::handleResDESCRIBE,this, placeholders::_1);
_onHandshake = std::bind(&RtspPlayer::handleResDESCRIBE,this, placeholders::_1);
StrCaseMap header;
header["Accept"] = "application/sdp";
return sendRtspRequest("DESCRIBE",m_strUrl,header);
return sendRtspRequest("DESCRIBE",_strUrl,header);
}
bool RtspPlayer::sendPause(bool bPause,float fTime){
if(!bPause){
//修改时间轴
m_aNowStampTicker[0].resetTime();
m_aNowStampTicker[1].resetTime();
_aNowStampTicker[0].resetTime();
_aNowStampTicker[1].resetTime();
float iTimeInc = fTime - getProgressTime();
for(unsigned int i = 0 ;i < m_uiTrackCnt ;i++){
if (m_aTrackInfo[i].type == TrackVideo) {
m_adFistStamp[i] = m_adNowStamp[i] + iTimeInc * 90000.0;
}else if (m_aTrackInfo[i].type == TrackAudio){
for(unsigned int i = 0 ;i < _uiTrackCnt ;i++){
if (_aTrackInfo[i].type == TrackVideo) {
_adFistStamp[i] = _adNowStamp[i] + iTimeInc * 90000.0;
}else if (_aTrackInfo[i].type == TrackAudio){
//todo(xzl) 修复此处
// m_adFistStamp[i] = m_adNowStamp[i] + iTimeInc * getAudioSampleRate();
// _adFistStamp[i] = _adNowStamp[i] + iTimeInc * getAudioSampleRate();
}
m_adNowStamp[i] = m_adFistStamp[i];
_adNowStamp[i] = _adFistStamp[i];
}
m_fSeekTo = fTime;
_fSeekTo = fTime;
}
//开启或暂停rtsp
m_onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause);
_onHandshake = std::bind(&RtspPlayer::handleResPAUSE,this, placeholders::_1,bPause);
StrCaseMap header;
char buf[8];
sprintf(buf,"%.2f",fTime);
header["Range"] = StrPrinter << "npt=" << buf << "-";
return sendRtspRequest(bPause ? "PAUSE" : "PLAY",m_strContentBase,header);
return sendRtspRequest(bPause ? "PAUSE" : "PLAY",_strContentBase,header);
}
void RtspPlayer::pause(bool bPause) {
sendPause(bPause,getProgressTime());
@@ -470,16 +470,16 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) {
}
if (!bPause) {
//修正时间轴
m_aNowStampTicker[0].resetTime();
m_aNowStampTicker[1].resetTime();
_aNowStampTicker[0].resetTime();
_aNowStampTicker[1].resetTime();
auto strRange = parser["Range"];
if (strRange.size()) {
auto strStart = FindField(strRange.data(), "npt=", "-");
if (strStart == "now") {
strStart = "0";
}
m_fSeekTo = atof(strStart.data());
DebugL << "Range:" << m_fSeekTo << " " << strStart ;
_fSeekTo = atof(strStart.data());
DebugL << "Range:" << _fSeekTo << " " << strStart ;
}
auto strRtpInfo = parser["RTP-Info"];
if (strRtpInfo.size()) {
@@ -490,14 +490,14 @@ void RtspPlayer::handleResPAUSE(const Parser& parser, bool bPause) {
auto strControlSuffix = strTrack.substr(1 + strTrack.rfind('/'),strTrack.find(';') - strTrack.rfind('/') - 1);
auto strRtpTime = FindField(strTrack.data(), "rtptime=", ";");
auto iIdx = getTrackIndexByControlSuffix(strControlSuffix);
m_adFistStamp[iIdx] = atoll(strRtpTime.data());
m_adNowStamp[iIdx] = m_adFistStamp[iIdx];
_adFistStamp[iIdx] = atoll(strRtpTime.data());
_adNowStamp[iIdx] = _adFistStamp[iIdx];
DebugL << "rtptime:" << strControlSuffix <<" " << strRtpTime;
}
}
onPlayResult_l(SockException(Err_success, "rtsp play success"));
} else {
m_pRtpTimer.reset();
_pRtpTimer.reset();
}
}
@@ -518,8 +518,8 @@ int RtspPlayer::onProcess(const char* pcBuf) {
parser.setContent(strContent);
}
}
auto fun = m_onHandshake;
m_onHandshake = nullptr;
auto fun = _onHandshake;
_onHandshake = nullptr;
if(fun){
fun(parser);
}
@@ -576,7 +576,7 @@ void RtspPlayer::splitRtp(unsigned char* pucRtp, unsigned int uiLen) {
uiLen -= (pos - rtp_ptr);
rtp_ptr = pos;
}
m_uiRtpBufLen = uiLen;
_uiRtpBufLen = uiLen;
if (rtp_ptr != pucRtp) {
memmove(pucRtp, rtp_ptr, uiLen);
}
@@ -589,8 +589,8 @@ void RtspPlayer::splitRtp(unsigned char* pucRtp, unsigned int uiLen) {
bool RtspPlayer::handleOneRtp(int iTrackidx, unsigned char *pucData, unsigned int uiLen) {
auto &track = m_aTrackInfo[iTrackidx];
auto pt_ptr=m_pktPool.obtain();
auto &track = _aTrackInfo[iTrackidx];
auto pt_ptr=_pktPool.obtain();
auto &rtppt=*pt_ptr;
rtppt.interleaved = track.interleaved;
rtppt.length = uiLen + 4;
@@ -613,13 +613,13 @@ bool RtspPlayer::handleOneRtp(int iTrackidx, unsigned char *pucData, unsigned in
} else if (track.ssrc != rtppt.ssrc) {
//ssrc错误
WarnL << "ssrc错误";
if (m_aui32SsrcErrorCnt[iTrackidx]++ > 10) {
if (_aui32SsrcErrorCnt[iTrackidx]++ > 10) {
track.ssrc = rtppt.ssrc;
WarnL << "ssrc更换!";
}
return false;
}
m_aui32SsrcErrorCnt[iTrackidx] = 0;
_aui32SsrcErrorCnt[iTrackidx] = 0;
rtppt.payload[0] = '$';
rtppt.payload[1] = rtppt.interleaved;
@@ -642,30 +642,30 @@ bool RtspPlayer::handleOneRtp(int iTrackidx, unsigned char *pucData, unsigned in
memcpy(rtppt.payload + 4, pucData, uiLen);
/////////////////////////////////RTP排序逻辑///////////////////////////////////
if(rtppt.sequence != (uint16_t)(m_aui16LastSeq[iTrackidx] + 1) && m_aui16LastSeq[iTrackidx] != 0){
if(rtppt.sequence != (uint16_t)(_aui16LastSeq[iTrackidx] + 1) && _aui16LastSeq[iTrackidx] != 0){
//包乱序或丢包
m_aui64SeqOkCnt[iTrackidx] = 0;
m_abSortStarted[iTrackidx] = true;
//WarnL << "包乱序或丢包:" << trackidx <<" " << rtppt.sequence << " " << m_aui16LastSeq[trackidx];
_aui64SeqOkCnt[iTrackidx] = 0;
_abSortStarted[iTrackidx] = true;
//WarnL << "包乱序或丢包:" << trackidx <<" " << rtppt.sequence << " " << _aui16LastSeq[trackidx];
}else{
//正确序列的包
m_aui64SeqOkCnt[iTrackidx]++;
_aui64SeqOkCnt[iTrackidx]++;
}
m_aui16LastSeq[iTrackidx] = rtppt.sequence;
_aui16LastSeq[iTrackidx] = rtppt.sequence;
//开始排序缓存
if (m_abSortStarted[iTrackidx]) {
m_amapRtpSort[iTrackidx].emplace(rtppt.sequence, pt_ptr);
if (_abSortStarted[iTrackidx]) {
_amapRtpSort[iTrackidx].emplace(rtppt.sequence, pt_ptr);
GET_CONFIG_AND_REGISTER(uint32_t,clearCount,Config::Rtp::kClearCount);
GET_CONFIG_AND_REGISTER(uint32_t,maxRtpCount,Config::Rtp::kMaxRtpCount);
if (m_aui64SeqOkCnt[iTrackidx] >= clearCount) {
if (_aui64SeqOkCnt[iTrackidx] >= clearCount) {
//网络环境改善,需要清空排序缓存
m_aui64SeqOkCnt[iTrackidx] = 0;
m_abSortStarted[iTrackidx] = false;
while (m_amapRtpSort[iTrackidx].size()) {
_aui64SeqOkCnt[iTrackidx] = 0;
_abSortStarted[iTrackidx] = false;
while (_amapRtpSort[iTrackidx].size()) {
POP_HEAD(iTrackidx)
}
} else if (m_amapRtpSort[iTrackidx].size() >= maxRtpCount) {
} else if (_amapRtpSort[iTrackidx].size() >= maxRtpCount) {
//排序缓存溢出
POP_HEAD(iTrackidx)
}
@@ -678,27 +678,27 @@ bool RtspPlayer::handleOneRtp(int iTrackidx, unsigned char *pucData, unsigned in
}
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &rtppt, int trackidx){
//统计丢包率
if (m_aui16FirstSeq[trackidx] == 0 || rtppt->sequence < m_aui16FirstSeq[trackidx]) {
m_aui16FirstSeq[trackidx] = rtppt->sequence;
m_aui64RtpRecv[trackidx] = 0;
if (_aui16FirstSeq[trackidx] == 0 || rtppt->sequence < _aui16FirstSeq[trackidx]) {
_aui16FirstSeq[trackidx] = rtppt->sequence;
_aui64RtpRecv[trackidx] = 0;
}
m_aui64RtpRecv[trackidx] ++;
m_aui16NowSeq[trackidx] = rtppt->sequence;
_aui64RtpRecv[trackidx] ++;
_aui16NowSeq[trackidx] = rtppt->sequence;
if (m_aNowStampTicker[trackidx].elapsedTime() > 500) {
m_adNowStamp[trackidx] = rtppt->timeStamp;
if (_aNowStampTicker[trackidx].elapsedTime() > 500) {
_adNowStamp[trackidx] = rtppt->timeStamp;
}
onRecvRTP_l(rtppt,m_aTrackInfo[trackidx]);
onRecvRTP_l(rtppt,_aTrackInfo[trackidx]);
}
float RtspPlayer::getRtpLossRate(int iTrackType) const{
int iTrackIdx = getTrackIndexByTrackType((TrackType)iTrackType);
if(iTrackIdx == -1){
uint64_t totalRecv = 0;
uint64_t totalSend = 0;
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
totalRecv += m_aui64RtpRecv[i];
totalSend += (m_aui16NowSeq[i] - m_aui16FirstSeq[i] + 1);
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
totalRecv += _aui64RtpRecv[i];
totalSend += (_aui16NowSeq[i] - _aui16FirstSeq[i] + 1);
}
if(totalSend == 0){
return 0;
@@ -707,25 +707,25 @@ float RtspPlayer::getRtpLossRate(int iTrackType) const{
}
if(m_aui16NowSeq[iTrackIdx] - m_aui16FirstSeq[iTrackIdx] + 1 == 0){
if(_aui16NowSeq[iTrackIdx] - _aui16FirstSeq[iTrackIdx] + 1 == 0){
return 0;
}
return 1.0 - (double)m_aui64RtpRecv[iTrackIdx] / (m_aui16NowSeq[iTrackIdx] - m_aui16FirstSeq[iTrackIdx] + 1);
return 1.0 - (double)_aui64RtpRecv[iTrackIdx] / (_aui16NowSeq[iTrackIdx] - _aui16FirstSeq[iTrackIdx] + 1);
}
float RtspPlayer::getProgressTime() const{
double iTime[2] = {0,0};
for(unsigned int i = 0 ;i < m_uiTrackCnt ;i++){
if (m_aTrackInfo[i].type == TrackVideo) {
iTime[i] = (m_adNowStamp[i] - m_adFistStamp[i]) / 90000.0;
}else if (m_aTrackInfo[i].type == TrackAudio){
for(unsigned int i = 0 ;i < _uiTrackCnt ;i++){
if (_aTrackInfo[i].type == TrackVideo) {
iTime[i] = (_adNowStamp[i] - _adFistStamp[i]) / 90000.0;
}else if (_aTrackInfo[i].type == TrackAudio){
//todo(xzl) 修复此处
#if 0
iTime[i] = (m_adNowStamp[i] - m_adFistStamp[i]) / getAudioSampleRate();
iTime[i] = (_adNowStamp[i] - _adFistStamp[i]) / getAudioSampleRate();
#endif
}
}
return m_fSeekTo + MAX(iTime[0],iTime[1]);
return _fSeekTo + MAX(iTime[0],iTime[1]);
}
void RtspPlayer::seekToTime(float fTime) {
sendPause(false,fTime);
@@ -733,9 +733,9 @@ void RtspPlayer::seekToTime(float fTime) {
bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrCaseMap &header_const) {
auto header = header_const;
header.emplace("CSeq",StrPrinter << m_uiCseq++);
if(!m_strSession.empty()){
header.emplace("Session",m_strSession);
header.emplace("CSeq",StrPrinter << _uiCseq++);
if(!_strSession.empty()){
header.emplace("Session",_strSession);
}
if(!(*this)[kRtspRealm].empty() && !(*this)[PlayerBase::kRtspUser].empty()){
@@ -782,28 +782,28 @@ bool RtspPlayer::sendRtspRequest(const string &cmd, const string &url,const StrC
void RtspPlayer::onShutdown_l(const SockException &ex) {
WarnL << ex.getErrCode() << " " << ex.what();
m_pPlayTimer.reset();
m_pRtpTimer.reset();
m_pBeatTimer.reset();
_pPlayTimer.reset();
_pRtpTimer.reset();
_pBeatTimer.reset();
onShutdown(ex);
}
void RtspPlayer::onRecvRTP_l(const RtpPacket::Ptr &pRtppt, const RtspTrack &track) {
m_rtpTicker.resetTime();
_rtpTicker.resetTime();
onRecvRTP(pRtppt,track);
}
void RtspPlayer::onPlayResult_l(const SockException &ex) {
WarnL << ex.getErrCode() << " " << ex.what();
m_pPlayTimer.reset();
m_pRtpTimer.reset();
_pPlayTimer.reset();
_pRtpTimer.reset();
if (!ex) {
m_rtpTicker.resetTime();
_rtpTicker.resetTime();
weak_ptr<RtspPlayer> weakSelf = dynamic_pointer_cast<RtspPlayer>(shared_from_this());
m_pRtpTimer.reset( new Timer(5, [weakSelf]() {
_pRtpTimer.reset( new Timer(5, [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return false;
}
if(strongSelf->m_rtpTicker.elapsedTime()>10000) {
if(strongSelf->_rtpTicker.elapsedTime()>10000) {
//recv rtp timeout!
strongSelf->onShutdown_l(SockException(Err_timeout,"recv rtp timeout"));
strongSelf->teardown();
@@ -816,16 +816,16 @@ void RtspPlayer::onPlayResult_l(const SockException &ex) {
}
int RtspPlayer::getTrackIndexByControlSuffix(const string &controlSuffix) const{
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
if (m_aTrackInfo[i].controlSuffix == controlSuffix) {
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
if (_aTrackInfo[i].controlSuffix == controlSuffix) {
return i;
}
}
return -1;
}
int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const{
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
if (m_aTrackInfo[i].interleaved == interleaved) {
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
if (_aTrackInfo[i].interleaved == interleaved) {
return i;
}
}
@@ -833,8 +833,8 @@ int RtspPlayer::getTrackIndexByInterleaved(int interleaved) const{
}
int RtspPlayer::getTrackIndexByTrackType(TrackType trackType) const {
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
if (m_aTrackInfo[i].type == trackType) {
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
if (_aTrackInfo[i].type == trackType) {
return i;
}
}

View File

@@ -101,45 +101,45 @@ private:
bool sendDescribe();
bool sendRtspRequest(const string &cmd, const string &url ,const StrCaseMap &header = StrCaseMap());
private:
string m_strUrl;
unsigned int m_uiTrackCnt = 0;
RtspTrack m_aTrackInfo[2];
string _strUrl;
unsigned int _uiTrackCnt = 0;
RtspTrack _aTrackInfo[2];
function<void(const Parser&)> m_onHandshake;
RtspMediaSource::PoolType m_pktPool;
function<void(const Parser&)> _onHandshake;
RtspMediaSource::PoolType _pktPool;
uint8_t *m_pucRtpBuf = nullptr;
unsigned int m_uiRtpBufLen = 0;
Socket::Ptr m_apUdpSock[2];
uint8_t *_pucRtpBuf = nullptr;
unsigned int _uiRtpBufLen = 0;
Socket::Ptr _apUdpSock[2];
//rtsp info
string m_strSession;
unsigned int m_uiCseq = 1;
uint32_t m_aui32SsrcErrorCnt[2] = { 0, 0 };
string m_strContentBase;
eRtpType m_eType = RTP_TCP;
string _strSession;
unsigned int _uiCseq = 1;
uint32_t _aui32SsrcErrorCnt[2] = { 0, 0 };
string _strContentBase;
eRtpType _eType = RTP_TCP;
/* RTP包排序所用参数 */
uint16_t m_aui16LastSeq[2] = { 0 , 0 };
uint64_t m_aui64SeqOkCnt[2] = { 0 , 0};
bool m_abSortStarted[2] = { 0 , 0};
map<uint32_t , RtpPacket::Ptr> m_amapRtpSort[2];
uint16_t _aui16LastSeq[2] = { 0 , 0 };
uint64_t _aui64SeqOkCnt[2] = { 0 , 0};
bool _abSortStarted[2] = { 0 , 0};
map<uint32_t , RtpPacket::Ptr> _amapRtpSort[2];
/* 丢包率统计需要用到的参数 */
uint16_t m_aui16FirstSeq[2] = { 0 , 0};
uint16_t m_aui16NowSeq[2] = { 0 , 0 };
uint64_t m_aui64RtpRecv[2] = { 0 , 0};
uint16_t _aui16FirstSeq[2] = { 0 , 0};
uint16_t _aui16NowSeq[2] = { 0 , 0 };
uint64_t _aui64RtpRecv[2] = { 0 , 0};
//超时功能实现
Ticker m_rtpTicker;
std::shared_ptr<Timer> m_pPlayTimer;
std::shared_ptr<Timer> m_pRtpTimer;
Ticker _rtpTicker;
std::shared_ptr<Timer> _pPlayTimer;
std::shared_ptr<Timer> _pRtpTimer;
//心跳定时器
std::shared_ptr<Timer> m_pBeatTimer;
std::shared_ptr<Timer> _pBeatTimer;
//播放进度控制
float m_fSeekTo = 0;
double m_adFistStamp[2] = {0,0};
double m_adNowStamp[2] = {0,0};
Ticker m_aNowStampTicker[2];
float _fSeekTo = 0;
double _adFistStamp[2] = {0,0};
double _adNowStamp[2] = {0,0};
Ticker _aNowStampTicker[2];
};
} /* namespace Rtsp */

View File

@@ -65,33 +65,33 @@ public:
private:
//派生类回调函数
bool onCheckSDP(const string &sdp, const RtspTrack *track, int trackCnt) override {
m_pRtspMediaSrc = dynamic_pointer_cast<RtspMediaSource>(m_pMediaSrc);
if(m_pRtspMediaSrc){
m_pRtspMediaSrc->onGetSDP(sdp);
_pRtspMediaSrc = dynamic_pointer_cast<RtspMediaSource>(_pMediaSrc);
if(_pRtspMediaSrc){
_pRtspMediaSrc->onGetSDP(sdp);
}
try {
m_parser.reset(new RtpParser(sdp));
_parser.reset(new RtpParser(sdp));
//todo(xzl) 修复此处
// m_parser->setOnVideoCB(m_onGetVideoCB);
// m_parser->setOnAudioCB(m_onGetAudioCB);
// _parser->setOnVideoCB(_onGetVideoCB);
// _parser->setOnAudioCB(_onGetAudioCB);
return true;
} catch (std::exception &ex) {
WarnL << ex.what();
return m_pRtspMediaSrc ? true : false;
return _pRtspMediaSrc ? true : false;
}
}
void onRecvRTP(const RtpPacket::Ptr &rtppt, const RtspTrack &track) override {
if(m_parser){
m_parser->inputRtp(rtppt);
if(_parser){
_parser->inputRtp(rtppt);
}
if(m_pRtspMediaSrc){
m_pRtspMediaSrc->onGetRTP(rtppt,true);
if(_pRtspMediaSrc){
_pRtspMediaSrc->onGetRTP(rtppt,true);
}
}
private:
RtspMediaSource::Ptr m_pRtspMediaSrc;
RtspMediaSource::Ptr _pRtspMediaSrc;
};

View File

@@ -57,7 +57,7 @@ unordered_map<void *, std::shared_ptr<RtspSession> > RtspSession::g_mapPostter;
recursive_mutex RtspSession::g_mtxGetter; //对quicktime上锁保护
recursive_mutex RtspSession::g_mtxPostter; //对quicktime上锁保护
RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
TcpSession(pTh, pSock), m_pSender(pSock) {
TcpSession(pTh, pSock), _pSender(pSock) {
//设置10秒发送缓存
pSock->setSendBufSecond(10);
//设置15秒发送超时时间
@@ -67,8 +67,8 @@ RtspSession::RtspSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::P
}
RtspSession::~RtspSession() {
if (m_onDestory) {
m_onDestory();
if (_onDestory) {
_onDestory();
}
DebugL << get_peer_ip();
}
@@ -80,34 +80,34 @@ void RtspSession::shutdown_l(bool close){
if (_sock) {
_sock->emitErr(SockException(Err_other, "self shutdown"),close);
}
if (m_bBase64need && !_sock) {
if (_bBase64need && !_sock) {
//quickTime http postter,and self is detached from tcpServer
lock_guard<recursive_mutex> lock(g_mtxPostter);
g_mapPostter.erase(this);
}
if (m_pBrdcaster) {
m_pBrdcaster->setDetachCB(this, nullptr);
m_pBrdcaster.reset();
if (_pBrdcaster) {
_pBrdcaster->setDetachCB(this, nullptr);
_pBrdcaster.reset();
}
if (m_pRtpReader) {
m_pRtpReader.reset();
if (_pRtpReader) {
_pRtpReader.reset();
}
}
void RtspSession::onError(const SockException& err) {
TraceL << err.getErrCode() << " " << err.what();
if (m_bListenPeerUdpData) {
if (_bListenPeerUdpData) {
//取消UDP端口监听
UDPServer::Instance().stopListenPeer(get_peer_ip().data(), this);
m_bListenPeerUdpData = false;
_bListenPeerUdpData = false;
}
if (!m_bBase64need && m_strSessionCookie.size() != 0) {
if (!_bBase64need && _strSessionCookie.size() != 0) {
//quickTime http getter
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter.erase(m_strSessionCookie);
g_mapGetter.erase(_strSessionCookie);
}
if (m_bBase64need && err.getErrCode() == Err_eof) {
if (_bBase64need && err.getErrCode() == Err_eof) {
//quickTime http postter,正在发送rtp; QuickTime只是断开了请求连接,请继续发送rtp
_sock = nullptr;
lock_guard<recursive_mutex> lock(g_mtxPostter);
@@ -121,24 +121,24 @@ void RtspSession::onError(const SockException& err) {
//流量统计事件广播
GET_CONFIG_AND_REGISTER(uint32_t,iFlowThreshold,Broadcast::kFlowThreshold);
if(m_ui64TotalBytes > iFlowThreshold * 1024){
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport,
m_mediaInfo,
m_ui64TotalBytes,
m_ticker.createdTime()/1000,
_mediaInfo,
_ui64TotalBytes,
_ticker.createdTime()/1000,
*this);
}
}
void RtspSession::onManager() {
if (m_ticker.createdTime() > 15 * 1000) {
if (m_strSession.size() == 0) {
if (_ticker.createdTime() > 15 * 1000) {
if (_strSession.size() == 0) {
WarnL << "非法链接:" << get_peer_ip();
shutdown();
return;
}
}
if (m_rtpType != PlayerBase::RTP_TCP && m_ticker.elapsedTime() > 15 * 1000) {
if (_rtpType != PlayerBase::RTP_TCP && _ticker.elapsedTime() > 15 * 1000) {
WarnL << "RTSP会话超时:" << get_peer_ip();
shutdown();
return;
@@ -148,11 +148,11 @@ void RtspSession::onManager() {
int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
char tmp[2 * 1024];
m_pcBuf = tmp;
_pcBuf = tmp;
m_parser.Parse(header); //rtsp请求解析
string strCmd = m_parser.Method(); //提取出请求命令字
m_iCseq = atoi(m_parser["CSeq"].data());
_parser.Parse(header); //rtsp请求解析
string strCmd = _parser.Method(); //提取出请求命令字
_iCseq = atoi(_parser["CSeq"].data());
typedef bool (RtspSession::*rtspCMDHandle)();
static unordered_map<string, rtspCMDHandle> g_mapCmd;
@@ -180,15 +180,15 @@ int64_t RtspSession::onRecvHeader(const char *header,uint64_t len) {
WarnL << "cmd=" << strCmd;
}
m_parser.Clear();
_parser.Clear();
return 0;
}
void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
m_ticker.resetTime();
m_ui64TotalBytes += pBuf->size();
if (m_bBase64need) {
_ticker.resetTime();
_ui64TotalBytes += pBuf->size();
if (_bBase64need) {
//quicktime 加密后的rtsp请求需要解密
auto str = decodeBase64(string(pBuf->data(),pBuf->size()));
inputRtspOrRtcp(str.data(),str.size());
@@ -198,7 +198,7 @@ void RtspSession::onRecv(const Buffer::Ptr &pBuf) {
}
void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
if(data[0] == '$' && m_rtpType == PlayerBase::RTP_TCP){
if(data[0] == '$' && _rtpType == PlayerBase::RTP_TCP){
//这是rtcp
return;
}
@@ -207,24 +207,24 @@ void RtspSession::inputRtspOrRtcp(const char *data,uint64_t len) {
bool RtspSession::handleReq_Options() {
//支持这些命令
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Public: OPTIONS, DESCRIBE, SETUP, TEARDOWN, PLAY,"
" PAUSE, SET_PARAMETER, GET_PARAMETER\r\n\r\n",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(m_pcBuf, n);
SocketHelper::send(_pcBuf, n);
return true;
}
bool RtspSession::handleReq_Describe() {
{
//解析url获取媒体名称
m_strUrl = m_parser.Url();
m_mediaInfo.parse(m_parser.FullUrl());
_strUrl = _parser.Url();
_mediaInfo.parse(_parser.FullUrl());
}
if (!findStream()) {
@@ -235,7 +235,7 @@ bool RtspSession::handleReq_Describe() {
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
//该请求中的认证信息
auto authorization = m_parser["Authorization"];
auto authorization = _parser["Authorization"];
onGetRealm invoker = [weakSelf,authorization](const string &realm){
if(realm.empty()){
//无需认证,回复sdp
@@ -248,7 +248,7 @@ bool RtspSession::handleReq_Describe() {
//广播是否需要认证事件
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnGetRtspRealm,
m_mediaInfo,
_mediaInfo,
invoker,
*this)){
//无人监听此事件,说明无需认证
@@ -279,10 +279,10 @@ void RtspSession::onAuthSuccess(const weak_ptr<RtspSession> &weakSelf) {
"Content-Base: %s/\r\n"
"Content-Type: application/sdp\r\n"
"Content-Length: %d\r\n\r\n%s",
strongSelf->m_iCseq, SERVER_NAME,
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strongSelf->m_strUrl.data(),
(int) strongSelf->m_strSdp.length(), strongSelf->m_strSdp.data());
dateHeader().data(), strongSelf->_strUrl.data(),
(int) strongSelf->_strSdp.length(), strongSelf->_strSdp.data());
strongSelf->SocketHelper::send(response, n);
});
}
@@ -304,16 +304,16 @@ void RtspSession::onAuthFailed(const weak_ptr<RtspSession> &weakSelf,const strin
GET_CONFIG_AND_REGISTER(bool,authBasic,Config::Rtsp::kAuthBasic);
if (!authBasic) {
//我们需要客户端优先以md5方式认证
strongSelf->m_strNonce = makeRandStr(32);
strongSelf->_strNonce = makeRandStr(32);
n = sprintf(response,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Digest realm=\"%s\",nonce=\"%s\"\r\n\r\n",
strongSelf->m_iCseq, SERVER_NAME,
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), realm.data(), strongSelf->m_strNonce.data());
dateHeader().data(), realm.data(), strongSelf->_strNonce.data());
}else {
//当然我们也支持base64认证,但是我们不建议这样做
n = sprintf(response,
@@ -322,7 +322,7 @@ void RtspSession::onAuthFailed(const weak_ptr<RtspSession> &weakSelf,const strin
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"WWW-Authenticate: Basic realm=\"%s\"\r\n\r\n",
strongSelf->m_iCseq, SERVER_NAME,
strongSelf->_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), realm.data());
}
@@ -359,7 +359,7 @@ void RtspSession::onAuthBasic(const weak_ptr<RtspSession> &weakSelf,const string
}
//此时必须提供明文密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->m_mediaInfo,user, true,invoker,*strongSelf)){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,user, true,invoker,*strongSelf)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@@ -388,8 +388,8 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
}
//check nonce
auto nonce = map["nonce"];
if(strongSelf->m_strNonce != nonce){
TraceL << "nonce not mached:" << nonce << "," << strongSelf->m_strNonce;
if(strongSelf->_strNonce != nonce){
TraceL << "nonce not mached:" << nonce << "," << strongSelf->_strNonce;
onAuthFailed(weakSelf,realm);
return ;
}
@@ -440,7 +440,7 @@ void RtspSession::onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const strin
};
//此时可以提供明文或md5加密的密码
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->m_mediaInfo,username, false,invoker,*strongSelf)){
if(!NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastOnRtspAuth,strongSelf->_mediaInfo,username, false,invoker,*strongSelf)){
//表明该流需要认证却没监听请求密码事件,这一般是大意的程序所为,警告之
WarnL << "请监听kBroadcastOnRtspAuth事件";
//但是我们还是忽略认证以便完成播放
@@ -469,80 +469,80 @@ void RtspSession::onAuthUser(const weak_ptr<RtspSession> &weakSelf,const string
}
}
inline void RtspSession::send_StreamNotFound() {
int n = sprintf(m_pcBuf, "RTSP/1.0 404 Stream Not Found\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 404 Stream Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(m_pcBuf, n);
SocketHelper::send(_pcBuf, n);
}
inline void RtspSession::send_UnsupportedTransport() {
int n = sprintf(m_pcBuf, "RTSP/1.0 461 Unsupported Transport\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 461 Unsupported Transport\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(m_pcBuf, n);
SocketHelper::send(_pcBuf, n);
}
inline void RtspSession::send_SessionNotFound() {
int n = sprintf(m_pcBuf, "RTSP/1.0 454 Session Not Found\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 454 Session Not Found\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(m_pcBuf, n);
SocketHelper::send(_pcBuf, n);
/*40 Method Not Allowed*/
}
bool RtspSession::handleReq_Setup() {
//处理setup命令该函数可能进入多次
auto controlSuffix = m_parser.FullUrl().substr(1 + m_parser.FullUrl().rfind('/'));
auto controlSuffix = _parser.FullUrl().substr(1 + _parser.FullUrl().rfind('/'));
int trackIdx = getTrackIndexByControlSuffix(controlSuffix);
if (trackIdx == -1) {
//未找到相应track
return false;
}
RtspTrack &trackRef = m_aTrackInfo[trackIdx];
RtspTrack &trackRef = _aTrackInfo[trackIdx];
if (trackRef.inited) {
//已经初始化过该Track
return false;
}
trackRef.inited = true; //现在初始化
auto strongRing = m_pWeakRing.lock();
auto strongRing = _pWeakRing.lock();
if (!strongRing) {
//the media source is released!
send_NotAcceptable();
return false;
}
if(!m_bSetUped){
m_bSetUped = true;
auto strTransport = m_parser["Transport"];
if(!_bSetUped){
_bSetUped = true;
auto strTransport = _parser["Transport"];
if(strTransport.find("TCP") != string::npos){
m_rtpType = PlayerBase::RTP_TCP;
_rtpType = PlayerBase::RTP_TCP;
}else if(strTransport.find("multicast") != string::npos){
m_rtpType = PlayerBase::RTP_MULTICAST;
_rtpType = PlayerBase::RTP_MULTICAST;
}else{
m_rtpType = PlayerBase::RTP_UDP;
_rtpType = PlayerBase::RTP_UDP;
}
}
if (!m_pRtpReader && m_rtpType != PlayerBase::RTP_MULTICAST) {
m_pRtpReader = strongRing->attach();
if (!_pRtpReader && _rtpType != PlayerBase::RTP_MULTICAST) {
_pRtpReader = strongRing->attach();
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
m_pRtpReader->setDetachCB([weakSelf]() {
_pRtpReader->setDetachCB([weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
@@ -551,9 +551,9 @@ bool RtspSession::handleReq_Setup() {
});
}
switch (m_rtpType) {
switch (_rtpType) {
case PlayerBase::RTP_TCP: {
int iLen = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
int iLen = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
@@ -562,13 +562,13 @@ bool RtspSession::handleReq_Setup() {
"Session: %s\r\n"
"x-Transport-Options: late-tolerance=1.400000\r\n"
"x-Dynamic-Rate: 1\r\n\r\n",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), trackRef.type * 2,
trackRef.type * 2 + 1,
printSSRC(trackRef.ssrc).data(),
m_strSession.data());
SocketHelper::send(m_pcBuf, iLen);
_strSession.data());
SocketHelper::send(_pcBuf, iLen);
}
break;
case PlayerBase::RTP_UDP: {
@@ -587,44 +587,44 @@ bool RtspSession::handleReq_Setup() {
send_NotAcceptable();
return false;
}
m_apUdpSock[trackIdx] = pSockRtp;
_apUdpSock[trackIdx] = pSockRtp;
//设置客户端内网端口信息
string strClientPort = FindField(m_parser["Transport"].data(), "client_port=", NULL);
string strClientPort = FindField(_parser["Transport"].data(), "client_port=", NULL);
uint16_t ui16PeerPort = atoi( FindField(strClientPort.data(), NULL, "-").data());
struct sockaddr_in peerAddr;
peerAddr.sin_family = AF_INET;
peerAddr.sin_port = htons(ui16PeerPort);
peerAddr.sin_addr.s_addr = inet_addr(get_peer_ip().data());
bzero(&(peerAddr.sin_zero), sizeof peerAddr.sin_zero);
m_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
_apPeerUdpAddr[trackIdx].reset((struct sockaddr *) (new struct sockaddr_in(peerAddr)));
//尝试获取客户端nat映射地址
startListenPeerUdpData();
//InfoL << "分配端口:" << srv_port;
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP/UDP;unicast;"
"client_port=%s;server_port=%d-%d;ssrc=%s;mode=play\r\n"
"Session: %s\r\n\r\n",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), strClientPort.data(),
pSockRtp->get_local_port(), pSockRtcp->get_local_port(),
printSSRC(trackRef.ssrc).data(),
m_strSession.data());
SocketHelper::send(m_pcBuf, n);
_strSession.data());
SocketHelper::send(_pcBuf, n);
}
break;
case PlayerBase::RTP_MULTICAST: {
if(!m_pBrdcaster){
m_pBrdcaster = RtpBroadCaster::get(get_local_ip(),m_mediaInfo.m_vhost, m_mediaInfo.m_app, m_mediaInfo.m_streamid);
if (!m_pBrdcaster) {
if(!_pBrdcaster){
_pBrdcaster = RtpBroadCaster::get(get_local_ip(),_mediaInfo._vhost, _mediaInfo._app, _mediaInfo._streamid);
if (!_pBrdcaster) {
send_NotAcceptable();
return false;
}
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
m_pBrdcaster->setDetachCB(this, [weakSelf]() {
_pBrdcaster->setDetachCB(this, [weakSelf]() {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
@@ -632,7 +632,7 @@ bool RtspSession::handleReq_Setup() {
strongSelf->safeShutdown();
});
}
int iSrvPort = m_pBrdcaster->getPort(trackRef.type);
int iSrvPort = _pBrdcaster->getPort(trackRef.type);
//我们用trackIdx区分rtp和rtcp包
auto pSockRtcp = UDPServer::Instance().getSock(get_local_ip().data(),2*trackIdx + 1,iSrvPort + 1);
if (!pSockRtcp) {
@@ -643,20 +643,20 @@ bool RtspSession::handleReq_Setup() {
}
startListenPeerUdpData();
GET_CONFIG_AND_REGISTER(uint32_t,udpTTL,MultiCast::kUdpTTL);
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Transport: RTP/AVP;multicast;destination=%s;"
"source=%s;port=%d-%d;ttl=%d;ssrc=%s\r\n"
"Session: %s\r\n\r\n",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), m_pBrdcaster->getIP().data(),
dateHeader().data(), _pBrdcaster->getIP().data(),
get_local_ip().data(), iSrvPort, pSockRtcp->get_local_port(),
udpTTL,printSSRC(trackRef.ssrc).data(),
m_strSession.data());
SocketHelper::send(m_pcBuf, n);
_strSession.data());
SocketHelper::send(_pcBuf, n);
}
break;
default:
@@ -666,39 +666,39 @@ bool RtspSession::handleReq_Setup() {
}
bool RtspSession::handleReq_Play() {
if (m_uiTrackCnt == 0) {
if (_uiTrackCnt == 0) {
//还没有Describe
return false;
}
if (m_parser["Session"] != m_strSession) {
if (_parser["Session"] != _strSession) {
send_SessionNotFound();
return false;
}
auto strRange = m_parser["Range"];
auto strRange = _parser["Range"];
auto onRes = [this,strRange](const string &err){
bool authSuccess = err.empty();
char response[2 * 1024];
m_pcBuf = response;
if(!authSuccess && m_bFirstPlay){
_pcBuf = response;
if(!authSuccess && _bFirstPlay){
//第一次play是播放否则是恢复播放。只对播放鉴权
int n = sprintf(m_pcBuf,
int n = sprintf(_pcBuf,
"RTSP/1.0 401 Unauthorized\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Content-Type: text/plain\r\n"
"Content-Length: %d\r\n\r\n%s",
m_iCseq, SERVER_NAME,
_iCseq, SERVER_NAME,
RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(),(int)err.size(),err.data());
SocketHelper::send(m_pcBuf,n);
SocketHelper::send(_pcBuf,n);
shutdown();
return;
}
if(m_pRtpReader){
if(_pRtpReader){
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
SockUtil::setNoDelay(m_pSender->rawFD(), false);
m_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
SockUtil::setNoDelay(_pSender->rawFD(), false);
_pRtpReader->setReadCB([weakSelf](const RtpPacket::Ptr &pack) {
auto strongSelf = weakSelf.lock();
if(!strongSelf) {
return;
@@ -714,10 +714,10 @@ bool RtspSession::handleReq_Play() {
});
}
auto pMediaSrc = m_pMediaSrc.lock();
auto pMediaSrc = _pMediaSrc.lock();
uint32_t iStamp = 0;
if(pMediaSrc){
if (strRange.size() && !m_bFirstPlay) {
if (strRange.size() && !_bFirstPlay) {
auto strStart = FindField(strRange.data(), "npt=", "-");
if (strStart == "now") {
strStart = "0";
@@ -733,41 +733,41 @@ bool RtspSession::handleReq_Play() {
}else{
iStamp = pMediaSrc->getStamp();
}
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
auto &track = m_aTrackInfo[i];
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
auto &track = _aTrackInfo[i];
track.ssrc = pMediaSrc->getSsrc(track.type);
track.seq = pMediaSrc->getSeqence(track.type);
track.timeStamp = pMediaSrc->getTimestamp(track.type);
}
}
m_bFirstPlay = false;
int iLen = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
_bFirstPlay = false;
int iLen = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n"
"Range: npt=%.2f-\r\n"
"RTP-Info: ", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), m_strSession.data(),iStamp/1000.0);
"RTP-Info: ", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data(),iStamp/1000.0);
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
auto &track = m_aTrackInfo[i];
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
auto &track = _aTrackInfo[i];
if (track.inited == false) {
//还有track没有setup
shutdown();
return;
}
iLen += sprintf(m_pcBuf + iLen, "url=%s/%s;seq=%d;rtptime=%u,",
m_strUrl.data(), track.controlSuffix.data(), track.seq,track.timeStamp);
iLen += sprintf(_pcBuf + iLen, "url=%s/%s;seq=%d;rtptime=%u,",
_strUrl.data(), track.controlSuffix.data(), track.seq,track.timeStamp);
}
iLen -= 1;
(m_pcBuf)[iLen] = '\0';
iLen += sprintf(m_pcBuf + iLen, "\r\n\r\n");
SocketHelper::send(m_pcBuf, iLen);
(_pcBuf)[iLen] = '\0';
iLen += sprintf(_pcBuf + iLen, "\r\n\r\n");
SocketHelper::send(_pcBuf, iLen);
//提高发送性能
(*this) << SocketFlags(kSockFlags);
SockUtil::setNoDelay(m_pSender->rawFD(),false);
SockUtil::setNoDelay(_pSender->rawFD(),false);
};
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
@@ -784,7 +784,7 @@ bool RtspSession::handleReq_Play() {
onRes(err);
});
};
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,m_mediaInfo,invoker,*this);
auto flag = NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,_mediaInfo,invoker,*this);
if(!flag){
//该事件无人监听,默认不鉴权
onRes("");
@@ -793,40 +793,40 @@ bool RtspSession::handleReq_Play() {
}
bool RtspSession::handleReq_Pause() {
if (m_parser["Session"] != m_strSession) {
if (_parser["Session"] != _strSession) {
send_SessionNotFound();
return false;
}
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), m_strSession.data());
SocketHelper::send(m_pcBuf, n);
if(m_pRtpReader){
m_pRtpReader->setReadCB(nullptr);
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
if(_pRtpReader){
_pRtpReader->setReadCB(nullptr);
}
return true;
}
bool RtspSession::handleReq_Teardown() {
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), m_strSession.data());
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(m_pcBuf, n);
SocketHelper::send(_pcBuf, n);
TraceL << "播放器断开连接!";
return false;
}
bool RtspSession::handleReq_Get() {
m_strSessionCookie = m_parser["x-sessioncookie"];
int n = sprintf(m_pcBuf, "HTTP/1.0 200 OK\r\n"
_strSessionCookie = _parser["x-sessioncookie"];
int n = sprintf(_pcBuf, "HTTP/1.0 200 OK\r\n"
"%s"
"Connection: close\r\n"
"Cache-Control: no-store\r\n"
@@ -835,23 +835,23 @@ bool RtspSession::handleReq_Get() {
dateHeader().data());
//注册GET
lock_guard<recursive_mutex> lock(g_mtxGetter);
g_mapGetter[m_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
//InfoL << m_strSessionCookie;
SocketHelper::send(m_pcBuf, n);
g_mapGetter[_strSessionCookie] = dynamic_pointer_cast<RtspSession>(shared_from_this());
//InfoL << _strSessionCookie;
SocketHelper::send(_pcBuf, n);
return true;
}
bool RtspSession::handleReq_Post() {
lock_guard<recursive_mutex> lock(g_mtxGetter);
string sessioncookie = m_parser["x-sessioncookie"];
string sessioncookie = _parser["x-sessioncookie"];
//Poster 找到 Getter
auto it = g_mapGetter.find(sessioncookie);
if (it == g_mapGetter.end()) {
//WarnL << sessioncookie;
return false;
}
m_bBase64need = true;
_bBase64need = true;
//Poster 找到Getter的SOCK
auto strongSession = it->second.lock();
g_mapGetter.erase(sessioncookie);
@@ -866,46 +866,46 @@ bool RtspSession::handleReq_Post() {
bool RtspSession::handleReq_SET_PARAMETER() {
//TraceL<<endl;
int n = sprintf(m_pcBuf, "RTSP/1.0 200 OK\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 200 OK\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Session: %s\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), m_strSession.data());
SocketHelper::send(m_pcBuf, n);
"Session: %s\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data(), _strSession.data());
SocketHelper::send(_pcBuf, n);
return true;
}
inline void RtspSession::send_NotAcceptable() {
int n = sprintf(m_pcBuf, "RTSP/1.0 406 Not Acceptable\r\n"
int n = sprintf(_pcBuf, "RTSP/1.0 406 Not Acceptable\r\n"
"CSeq: %d\r\n"
"Server: %s-%0.2f(build in %s)\r\n"
"%s"
"Connection: Close\r\n\r\n", m_iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
"Connection: Close\r\n\r\n", _iCseq, SERVER_NAME, RTSP_VERSION, RTSP_BUILDTIME,
dateHeader().data());
SocketHelper::send(m_pcBuf, n);
SocketHelper::send(_pcBuf, n);
}
inline bool RtspSession::findStream() {
RtspMediaSource::Ptr pMediaSrc =
dynamic_pointer_cast<RtspMediaSource>( MediaSource::find(RTSP_SCHEMA,m_mediaInfo.m_vhost, m_mediaInfo.m_app,m_mediaInfo.m_streamid) );
dynamic_pointer_cast<RtspMediaSource>( MediaSource::find(RTSP_SCHEMA,_mediaInfo._vhost, _mediaInfo._app,_mediaInfo._streamid) );
if (!pMediaSrc) {
WarnL << "No such stream:" << m_mediaInfo.m_vhost << " " << m_mediaInfo.m_app << " " << m_mediaInfo.m_streamid;
WarnL << "No such stream:" << _mediaInfo._vhost << " " << _mediaInfo._app << " " << _mediaInfo._streamid;
return false;
}
m_strSdp = pMediaSrc->getSdp();
m_pWeakRing = pMediaSrc->getRing();
_strSdp = pMediaSrc->getSdp();
_pWeakRing = pMediaSrc->getRing();
m_uiTrackCnt = parserSDP(m_strSdp, m_aTrackInfo);
if (m_uiTrackCnt == 0 || m_uiTrackCnt > 2) {
_uiTrackCnt = parserSDP(_strSdp, _aTrackInfo);
if (_uiTrackCnt == 0 || _uiTrackCnt > 2) {
return false;
}
m_strSession = makeRandStr(12);
m_pMediaSrc = pMediaSrc;
_strSession = makeRandStr(12);
_pMediaSrc = pMediaSrc;
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
auto &track = m_aTrackInfo[i];
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
auto &track = _aTrackInfo[i];
track.ssrc = pMediaSrc->getSsrc(track.type);
track.seq = pMediaSrc->getSeqence(track.type);
track.timeStamp = pMediaSrc->getTimestamp(track.type);
@@ -917,19 +917,19 @@ inline bool RtspSession::findStream() {
inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
//InfoL<<(int)pkt.Interleaved;
switch (m_rtpType) {
switch (_rtpType) {
case PlayerBase::RTP_TCP: {
BufferRtp::Ptr buffer(new BufferRtp(pkt));
send(buffer);
#ifdef RTSP_SEND_RTCP
int iTrackIndex = getTrackIndexByTrackId(pkt.interleaved / 2);
RtcpCounter &counter = m_aRtcpCnt[iTrackIndex];
RtcpCounter &counter = _aRtcpCnt[iTrackIndex];
counter.pktCnt += 1;
counter.octCount += (pkt.length - 12);
auto &m_ticker = m_aRtcpTicker[iTrackIndex];
if (m_ticker.elapsedTime() > 5 * 1000) {
auto &_ticker = _aRtcpTicker[iTrackIndex];
if (_ticker.elapsedTime() > 5 * 1000) {
//send rtcp every 5 second
m_ticker.resetTime();
_ticker.resetTime();
counter.timeStamp = pkt.timeStamp;
sendRTCP();
}
@@ -938,17 +938,17 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
break;
case PlayerBase::RTP_UDP: {
int iTrackIndex = getTrackIndexByTrackType(pkt->type);
auto pSock = m_apUdpSock[iTrackIndex].lock();
auto pSock = _apUdpSock[iTrackIndex].lock();
if (!pSock) {
shutdown();
return;
}
auto peerAddr = m_apPeerUdpAddr[iTrackIndex];
auto peerAddr = _apPeerUdpAddr[iTrackIndex];
if (!peerAddr) {
return;
}
BufferRtp::Ptr buffer(new BufferRtp(pkt,4));
m_ui64TotalBytes += buffer->size();
_ui64TotalBytes += buffer->size();
pSock->send(buffer,kSockFlags, peerAddr.get());
}
break;
@@ -960,35 +960,35 @@ inline void RtspSession::sendRtpPacket(const RtpPacket::Ptr & pkt) {
inline void RtspSession::onRcvPeerUdpData(int iTrackIdx, const Buffer::Ptr &pBuf, const struct sockaddr& addr) {
if(iTrackIdx % 2 == 0){
//这是rtp探测包
if(!m_bGotAllPeerUdp){
if(!_bGotAllPeerUdp){
//还没有获取完整的rtp探测包
if(SockUtil::in_same_lan(get_local_ip().data(),get_peer_ip().data())){
//在内网中客户端上报的端口号是真实的所以我们忽略udp打洞包
m_bGotAllPeerUdp = true;
_bGotAllPeerUdp = true;
return;
}
//设置真实的客户端nat映射端口号
m_apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
m_abGotPeerUdp[iTrackIdx / 2] = true;
m_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
if (!m_abGotPeerUdp[i]) {
_apPeerUdpAddr[iTrackIdx / 2].reset(new struct sockaddr(addr));
_abGotPeerUdp[iTrackIdx / 2] = true;
_bGotAllPeerUdp = true;//先假设获取到完整的rtp探测包
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
if (!_abGotPeerUdp[i]) {
//还有track没获取到rtp探测包
m_bGotAllPeerUdp = false;
_bGotAllPeerUdp = false;
break;
}
}
}
}else{
//这是rtcp心跳包说明播放器还存活
m_ticker.resetTime();
_ticker.resetTime();
//TraceL << "rtcp:" << (iTrackIdx-1)/2 ;
}
}
inline void RtspSession::startListenPeerUdpData() {
m_bListenPeerUdpData = true;
_bListenPeerUdpData = true;
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
UDPServer::Instance().listenPeer(get_peer_ip().data(), this,
[weakSelf](int iTrackIdx,const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr)->bool {
@@ -1009,15 +1009,15 @@ inline void RtspSession::startListenPeerUdpData() {
}
inline void RtspSession::initSender(const std::shared_ptr<RtspSession>& session) {
m_pSender = session->_sock;
_pSender = session->_sock;
weak_ptr<RtspSession> weakSelf = dynamic_pointer_cast<RtspSession>(shared_from_this());
session->m_onDestory = [weakSelf]() {
session->_onDestory = [weakSelf]() {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
}
//DebugL;
strongSelf->m_pSender->setOnErr([weakSelf](const SockException &err) {
strongSelf->_pSender->setOnErr([weakSelf](const SockException &err) {
auto strongSelf=weakSelf.lock();
if(!strongSelf) {
return;
@@ -1033,9 +1033,9 @@ inline void RtspSession::sendRTCP() {
//DebugL;
uint8_t aui8Rtcp[60] = {0};
uint8_t *pui8Rtcp_SR = aui8Rtcp + 4, *pui8Rtcp_SDES = pui8Rtcp_SR + 28;
for (uint8_t i = 0; i < m_uiTrackCnt; i++) {
auto &track = m_aTrackInfo[i];
auto &counter = m_aRtcpCnt[i];
for (uint8_t i = 0; i < _uiTrackCnt; i++) {
auto &track = _aTrackInfo[i];
auto &counter = _aRtcpCnt[i];
aui8Rtcp[0] = '$';
aui8Rtcp[1] = track.trackId * 2 + 1;

View File

@@ -90,8 +90,8 @@ protected:
private:
void inputRtspOrRtcp(const char *data,uint64_t len);
int send(const Buffer::Ptr &pkt) override{
m_ui64TotalBytes += pkt->size();
return m_pSender->send(pkt,_flags);
_ui64TotalBytes += pkt->size();
return _pSender->send(pkt,_flags);
}
void shutdown() override ;
void shutdown_l(bool close);
@@ -123,16 +123,16 @@ private:
return tmp;
}
inline int getTrackIndexByTrackType(TrackType type) {
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
if (type == m_aTrackInfo[i].type) {
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
if (type == _aTrackInfo[i].type) {
return i;
}
}
return -1;
}
inline int getTrackIndexByControlSuffix(const string &controlSuffix) {
for (unsigned int i = 0; i < m_uiTrackCnt; i++) {
if (controlSuffix == m_aTrackInfo[i].controlSuffix) {
for (unsigned int i = 0; i < _uiTrackCnt; i++) {
if (controlSuffix == _aTrackInfo[i].controlSuffix) {
return i;
}
}
@@ -150,53 +150,53 @@ private:
static void onAuthDigest(const weak_ptr<RtspSession> &weakSelf,const string &realm,const string &strMd5);
private:
char *m_pcBuf = nullptr;
Ticker m_ticker;
Parser m_parser; //rtsp解析类
string m_strUrl;
string m_strSdp;
string m_strSession;
bool m_bFirstPlay = true;
MediaInfo m_mediaInfo;
std::weak_ptr<RtspMediaSource> m_pMediaSrc;
char *_pcBuf = nullptr;
Ticker _ticker;
Parser _parser; //rtsp解析类
string _strUrl;
string _strSdp;
string _strSession;
bool _bFirstPlay = true;
MediaInfo _mediaInfo;
std::weak_ptr<RtspMediaSource> _pMediaSrc;
//RTP缓冲
weak_ptr<RingBuffer<RtpPacket::Ptr> > m_pWeakRing;
RingBuffer<RtpPacket::Ptr>::RingReader::Ptr m_pRtpReader;
weak_ptr<RingBuffer<RtpPacket::Ptr> > _pWeakRing;
RingBuffer<RtpPacket::Ptr>::RingReader::Ptr _pRtpReader;
PlayerBase::eRtpType m_rtpType = PlayerBase::RTP_UDP;
bool m_bSetUped = false;
int m_iCseq = 0;
unsigned int m_uiTrackCnt = 0; //媒体track个数
RtspTrack m_aTrackInfo[2]; //媒体track信息,trackid idx 为数组下标
bool m_bGotAllPeerUdp = false;
PlayerBase::eRtpType _rtpType = PlayerBase::RTP_UDP;
bool _bSetUped = false;
int _iCseq = 0;
unsigned int _uiTrackCnt = 0; //媒体track个数
RtspTrack _aTrackInfo[2]; //媒体track信息,trackid idx 为数组下标
bool _bGotAllPeerUdp = false;
#ifdef RTSP_SEND_RTCP
RtcpCounter m_aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标
Ticker m_aRtcpTicker[2]; //rtcp发送时间,trackid idx 为数组下标
RtcpCounter _aRtcpCnt[2]; //rtcp统计,trackid idx 为数组下标
Ticker _aRtcpTicker[2]; //rtcp发送时间,trackid idx 为数组下标
inline void sendRTCP();
#endif
//RTP over UDP
bool m_abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数
weak_ptr<Socket> m_apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标
std::shared_ptr<struct sockaddr> m_apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标
bool m_bListenPeerUdpData = false;
RtpBroadCaster::Ptr m_pBrdcaster;
bool _abGotPeerUdp[2] = { false, false }; //获取客户端udp端口计数
weak_ptr<Socket> _apUdpSock[2]; //发送RTP的UDP端口,trackid idx 为数组下标
std::shared_ptr<struct sockaddr> _apPeerUdpAddr[2]; //播放器接收RTP的地址,trackid idx 为数组下标
bool _bListenPeerUdpData = false;
RtpBroadCaster::Ptr _pBrdcaster;
//登录认证
string m_strNonce;
string _strNonce;
//RTSP over HTTP
function<void(void)> m_onDestory;
bool m_bBase64need = false; //是否需要base64解码
Socket::Ptr m_pSender; //回复rtsp时走的tcp通道供quicktime用
function<void(void)> _onDestory;
bool _bBase64need = false; //是否需要base64解码
Socket::Ptr _pSender; //回复rtsp时走的tcp通道供quicktime用
//quicktime 请求rtsp会产生两次tcp连接
//一次发送 get 一次发送post需要通过sessioncookie关联起来
string m_strSessionCookie;
string _strSessionCookie;
//消耗的总流量
uint64_t m_ui64TotalBytes = 0;
uint64_t _ui64TotalBytes = 0;
static recursive_mutex g_mtxGetter; //对quicktime上锁保护
static recursive_mutex g_mtxPostter; //对quicktime上锁保护

View File

@@ -43,7 +43,7 @@ RtspToRtmpMediaSource::RtspToRtmpMediaSource(const string &vhost,
const string &id,
bool bEnableHls,
bool bEnableMp4) :
RtspMediaSource(vhost,app,id),m_bEnableHls(bEnableHls),m_bEnableMp4(bEnableMp4) {
RtspMediaSource(vhost,app,id),_bEnableHls(bEnableHls),_bEnableMp4(bEnableMp4) {
}
RtspToRtmpMediaSource::~RtspToRtmpMediaSource() {
@@ -65,36 +65,36 @@ void RtspToRtmpMediaSource::makeVideoConfigPkt() {
rtmpPkt->strBuf.push_back(1); // version
//todo(xzl) 修复此处
string m_sps ;//= m_pParser->getSps().substr(4);
string m_pps ;//= m_pParser->getPps().substr(4);
//DebugL<<hexdump(m_sps.data(), m_sps.size());
rtmpPkt->strBuf.push_back(m_sps[1]); // profile
rtmpPkt->strBuf.push_back(m_sps[2]); // compat
rtmpPkt->strBuf.push_back(m_sps[3]); // level
string _sps ;//= _pParser->getSps().substr(4);
string _pps ;//= _pParser->getPps().substr(4);
//DebugL<<hexdump(_sps.data(), _sps.size());
rtmpPkt->strBuf.push_back(_sps[1]); // profile
rtmpPkt->strBuf.push_back(_sps[2]); // compat
rtmpPkt->strBuf.push_back(_sps[3]); // level
rtmpPkt->strBuf.push_back(0xff); // 6 bits reserved + 2 bits nal size length - 1 (11)
rtmpPkt->strBuf.push_back(0xe1); // 3 bits reserved + 5 bits number of sps (00001)
uint16_t size = m_sps.size();
uint16_t size = _sps.size();
size = htons(size);
rtmpPkt->strBuf.append((char *) &size, 2);
rtmpPkt->strBuf.append(m_sps);
rtmpPkt->strBuf.append(_sps);
/////////////pps
rtmpPkt->strBuf.push_back(1); // version
size = m_pps.size();
size = _pps.size();
size = htons(size);
rtmpPkt->strBuf.append((char *) &size, 2);
rtmpPkt->strBuf.append(m_pps);
rtmpPkt->strBuf.append(_pps);
rtmpPkt->bodySize = rtmpPkt->strBuf.size();
rtmpPkt->chunkId = CHUNK_VIDEO;
rtmpPkt->streamId = STREAM_MEDIA;
rtmpPkt->timeStamp = 0;
rtmpPkt->typeId = MSG_VIDEO;
m_pRtmpSrc->onGetMedia(rtmpPkt);
_pRtmpSrc->onGetMedia(rtmpPkt);
}
void RtspToRtmpMediaSource::onGetH264(const H264Frame& frame) {
if(m_pRecorder){
m_pRecorder->inputH264((char *) frame.data(), frame.size(), frame.timeStamp, frame.type);
if(_pRecorder){
_pRecorder->inputH264((char *) frame.data(), frame.size(), frame.timeStamp, frame.type);
}
uint8_t nal_type = frame.data()[4] & 0x1F;
int8_t flags = 7; //h.264
@@ -124,17 +124,17 @@ void RtspToRtmpMediaSource::onGetH264(const H264Frame& frame) {
rtmpPkt->streamId = STREAM_MEDIA;
rtmpPkt->timeStamp = frame.timeStamp;
rtmpPkt->typeId = MSG_VIDEO;
m_pRtmpSrc->onGetMedia(rtmpPkt);
_pRtmpSrc->onGetMedia(rtmpPkt);
}
void RtspToRtmpMediaSource::onGetAAC(const AACFrame& frame) {
if(m_pRecorder){
m_pRecorder->inputAAC((char *) frame.buffer, frame.aac_frame_length, frame.timeStamp);
if(_pRecorder){
_pRecorder->inputAAC((char *) frame.buffer, frame.aac_frame_length, frame.timeStamp);
}
RtmpPacket::Ptr rtmpPkt(new RtmpPacket);
//////////header
uint8_t is_config = false;
rtmpPkt->strBuf.push_back(m_ui8AudioFlags);
rtmpPkt->strBuf.push_back(_ui8AudioFlags);
rtmpPkt->strBuf.push_back(!is_config);
rtmpPkt->strBuf.append((char *) frame.buffer + 7, frame.aac_frame_length - 7);
@@ -143,15 +143,15 @@ void RtspToRtmpMediaSource::onGetAAC(const AACFrame& frame) {
rtmpPkt->streamId = STREAM_MEDIA;
rtmpPkt->timeStamp = frame.timeStamp;
rtmpPkt->typeId = MSG_AUDIO;
m_pRtmpSrc->onGetMedia(rtmpPkt);
_pRtmpSrc->onGetMedia(rtmpPkt);
}
void RtspToRtmpMediaSource::makeAudioConfigPkt() {
//todo(xzl) 修复此处
#if 0
uint8_t flvStereoOrMono = (m_pParser->getAudioChannel() > 1);
uint8_t flvStereoOrMono = (_pParser->getAudioChannel() > 1);
uint8_t flvSampleRate;
switch (m_pParser->getAudioSampleRate()) {
switch (_pParser->getAudioSampleRate()) {
case 48000:
case 44100:
flvSampleRate = 3;
@@ -168,57 +168,57 @@ void RtspToRtmpMediaSource::makeAudioConfigPkt() {
flvSampleRate = 0;
break;
}
uint8_t flvSampleBit = m_pParser->getAudioSampleBit() == 16;
uint8_t flvSampleBit = _pParser->getAudioSampleBit() == 16;
uint8_t flvAudioType = 10; //aac
m_ui8AudioFlags = (flvAudioType << 4) | (flvSampleRate << 2) | (flvSampleBit << 1) | flvStereoOrMono;
_ui8AudioFlags = (flvAudioType << 4) | (flvSampleRate << 2) | (flvSampleBit << 1) | flvStereoOrMono;
RtmpPacket::Ptr rtmpPkt(new RtmpPacket);
//////////header
uint8_t is_config = true;
rtmpPkt->strBuf.push_back(m_ui8AudioFlags);
rtmpPkt->strBuf.push_back(_ui8AudioFlags);
rtmpPkt->strBuf.push_back(!is_config);
rtmpPkt->strBuf.append(m_pParser->getAudioCfg());
rtmpPkt->strBuf.append(_pParser->getAudioCfg());
rtmpPkt->bodySize = rtmpPkt->strBuf.size();
rtmpPkt->chunkId = CHUNK_AUDIO;
rtmpPkt->streamId = STREAM_MEDIA;
rtmpPkt->timeStamp = 0;
rtmpPkt->typeId = MSG_AUDIO;
m_pRtmpSrc->onGetMedia(rtmpPkt);
_pRtmpSrc->onGetMedia(rtmpPkt);
#endif
}
void RtspToRtmpMediaSource::makeMetaData() {
m_pRtmpSrc.reset(new RtmpMediaSource(getVhost(),getApp(),getId()));
m_pRtmpSrc->setListener(m_listener);
_pRtmpSrc.reset(new RtmpMediaSource(getVhost(),getApp(),getId()));
_pRtmpSrc->setListener(_listener);
AMFValue metaData(AMF_OBJECT);
metaData.set("duration", m_pParser->getDuration());
metaData.set("duration", _pParser->getDuration());
metaData.set("fileSize", 0);
//todo(xzl) 修复此处
#if 0
if (m_pParser->containVideo()) {
metaData.set("width", m_pParser->getVideoWidth());
metaData.set("height", m_pParser->getVideoHeight());
if (_pParser->containVideo()) {
metaData.set("width", _pParser->getVideoWidth());
metaData.set("height", _pParser->getVideoHeight());
metaData.set("videocodecid", "avc1"); //h.264
metaData.set("videodatarate", 5000);
metaData.set("framerate", m_pParser->getVideoFps());
metaData.set("framerate", _pParser->getVideoFps());
makeVideoConfigPkt();
}
if (m_pParser->containAudio()) {
if (_pParser->containAudio()) {
metaData.set("audiocodecid", "mp4a"); //aac
metaData.set("audiodatarate", 160);
metaData.set("audiosamplerate", m_pParser->getAudioSampleRate());
metaData.set("audiosamplesize", m_pParser->getAudioSampleBit());
metaData.set("audiochannels", m_pParser->getAudioChannel());
metaData.set("stereo", m_pParser->getAudioChannel() > 1);
metaData.set("audiosamplerate", _pParser->getAudioSampleRate());
metaData.set("audiosamplesize", _pParser->getAudioSampleBit());
metaData.set("audiochannels", _pParser->getAudioChannel());
metaData.set("stereo", _pParser->getAudioChannel() > 1);
makeAudioConfigPkt();
}
#endif
m_pRtmpSrc->onGetMetaData(metaData);
_pRtmpSrc->onGetMetaData(metaData);
}
} /* namespace Rtsp */
} /* namespace ZL */

View File

@@ -52,11 +52,11 @@ public:
virtual void onGetSDP(const string& strSdp) override{
try {
m_pParser.reset(new RtpParser(strSdp));
m_pRecorder.reset(new MediaRecorder(getVhost(),getApp(),getId(),m_pParser,m_bEnableHls,m_bEnableMp4));
_pParser.reset(new RtpParser(strSdp));
_pRecorder.reset(new MediaRecorder(getVhost(),getApp(),getId(),_pParser,_bEnableHls,_bEnableMp4));
//todo(xzl) 修复此处
// m_pParser->setOnAudioCB( std::bind(&RtspToRtmpMediaSource::onGetAAC, this, placeholders::_1));
// m_pParser->setOnVideoCB( std::bind(&RtspToRtmpMediaSource::onGetH264, this, placeholders::_1));
// _pParser->setOnAudioCB( std::bind(&RtspToRtmpMediaSource::onGetAAC, this, placeholders::_1));
// _pParser->setOnVideoCB( std::bind(&RtspToRtmpMediaSource::onGetH264, this, placeholders::_1));
makeMetaData();
} catch (exception &ex) {
WarnL << ex.what();
@@ -64,22 +64,22 @@ public:
RtspMediaSource::onGetSDP(strSdp);
}
virtual void onGetRTP(const RtpPacket::Ptr &pRtppkt, bool bKeyPos) override{
if (m_pParser) {
bKeyPos = m_pParser->inputRtp(pRtppkt);
if (_pParser) {
bKeyPos = _pParser->inputRtp(pRtppkt);
}
RtspMediaSource::onGetRTP(pRtppkt, bKeyPos);
}
int readerCount(){
return getRing()->readerCount() + (m_pRtmpSrc ? m_pRtmpSrc->getRing()->readerCount() : 0);
return getRing()->readerCount() + (_pRtmpSrc ? _pRtmpSrc->getRing()->readerCount() : 0);
}
void updateTimeStamp(uint32_t uiStamp) {
for (auto &pr : m_mapTracks) {
for (auto &pr : _mapTracks) {
switch (pr.second.type) {
case TrackAudio: {
//todo(xzl) 修复此处
// pr.second.timeStamp = uiStamp * (m_pParser->getAudioSampleRate() / 1000.0);
// pr.second.timeStamp = uiStamp * (_pParser->getAudioSampleRate() / 1000.0);
}
break;
case TrackVideo: {
@@ -100,12 +100,12 @@ private:
void makeAudioConfigPkt();
void makeMetaData();
private:
RtpParser::Ptr m_pParser;
RtmpMediaSource::Ptr m_pRtmpSrc;
uint8_t m_ui8AudioFlags = 0;
MediaRecorder::Ptr m_pRecorder;
bool m_bEnableHls;
bool m_bEnableMp4;
RtpParser::Ptr _pParser;
RtmpMediaSource::Ptr _pRtmpSrc;
uint8_t _ui8AudioFlags = 0;
MediaRecorder::Ptr _pRecorder;
bool _bEnableHls;
bool _bEnableMp4;
};

View File

@@ -48,10 +48,10 @@ UDPServer::~UDPServer() {
}
Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t iLocalPort) {
lock_guard<mutex> lck(m_mtxUpdSock);
lock_guard<mutex> lck(_mtxUpdSock);
string strKey = StrPrinter << strLocalIp << ":" << iTrackIndex << endl;
auto it = m_mapUpdSock.find(strKey);
if (it == m_mapUpdSock.end()) {
auto it = _mapUpdSock.find(strKey);
if (it == _mapUpdSock.end()) {
Socket::Ptr pSock(new Socket());
//InfoL<<localIp;
if (!pSock->bindUdpSock(iLocalPort, strLocalIp)) {
@@ -61,7 +61,7 @@ Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t
pSock->setOnRead(bind(&UDPServer::onRcvData, this, iTrackIndex, placeholders::_1,placeholders::_2));
pSock->setOnErr(bind(&UDPServer::onErr, this, strKey, placeholders::_1));
m_mapUpdSock[strKey] = pSock;
_mapUpdSock[strKey] = pSock;
DebugL << strLocalIp << " " << pSock->get_local_port() << " " << iTrackIndex;
return pSock;
}
@@ -69,15 +69,15 @@ Socket::Ptr UDPServer::getSock(const char* strLocalIp, int iTrackIndex,uint16_t
}
void UDPServer::listenPeer(const char* strPeerIp, void* pSelf, const onRecvData& cb) {
lock_guard<mutex> lck(m_mtxDataHandler);
auto &mapRef = m_mapDataHandler[strPeerIp];
lock_guard<mutex> lck(_mtxDataHandler);
auto &mapRef = _mapDataHandler[strPeerIp];
mapRef.emplace(pSelf, cb);
}
void UDPServer::stopListenPeer(const char* strPeerIp, void* pSelf) {
lock_guard<mutex> lck(m_mtxDataHandler);
auto it0 = m_mapDataHandler.find(strPeerIp);
if (it0 == m_mapDataHandler.end()) {
lock_guard<mutex> lck(_mtxDataHandler);
auto it0 = _mapDataHandler.find(strPeerIp);
if (it0 == _mapDataHandler.end()) {
return;
}
auto &mapRef = it0->second;
@@ -86,22 +86,22 @@ void UDPServer::stopListenPeer(const char* strPeerIp, void* pSelf) {
mapRef.erase(it1);
}
if (mapRef.size() == 0) {
m_mapDataHandler.erase(it0);
_mapDataHandler.erase(it0);
}
}
void UDPServer::onErr(const string& strKey, const SockException& err) {
WarnL << err.what();
lock_guard<mutex> lck(m_mtxUpdSock);
m_mapUpdSock.erase(strKey);
lock_guard<mutex> lck(_mtxUpdSock);
_mapUpdSock.erase(strKey);
}
void UDPServer::onRcvData(int iTrackIndex, const Buffer::Ptr &pBuf, struct sockaddr* pPeerAddr) {
//TraceL << trackIndex;
struct sockaddr_in *in = (struct sockaddr_in *) pPeerAddr;
string peerIp = inet_ntoa(in->sin_addr);
lock_guard<mutex> lck(m_mtxDataHandler);
auto it0 = m_mapDataHandler.find(peerIp);
if (it0 == m_mapDataHandler.end()) {
lock_guard<mutex> lck(_mtxDataHandler);
auto it0 = _mapDataHandler.find(peerIp);
if (it0 == _mapDataHandler.end()) {
return;
}
auto &mapRef = it0->second;
@@ -112,7 +112,7 @@ void UDPServer::onRcvData(int iTrackIndex, const Buffer::Ptr &pBuf, struct socka
}
}
if (mapRef.size() == 0) {
m_mapDataHandler.erase(it0);
_mapDataHandler.erase(it0);
}
}

View File

@@ -56,11 +56,11 @@ public:
private:
void onRcvData(int iTrackId, const Buffer::Ptr &pBuf,struct sockaddr *pPeerAddr);
void onErr(const string &strKey,const SockException &err);
unordered_map<string, Socket::Ptr> m_mapUpdSock;
mutex m_mtxUpdSock;
unordered_map<string, Socket::Ptr> _mapUpdSock;
mutex _mtxUpdSock;
unordered_map<string, unordered_map<void *, onRecvData> > m_mapDataHandler;
mutex m_mtxDataHandler;
unordered_map<string, unordered_map<void *, onRecvData> > _mapDataHandler;
mutex _mtxDataHandler;
};
} /* namespace Rtsp */