初步提交2.0版本,支持虚拟主机

This commit is contained in:
xiongziliang
2018-02-02 18:06:08 +08:00
parent 1262c4c51d
commit bd72a69d33
44 changed files with 354 additions and 730 deletions

View File

@@ -32,30 +32,6 @@ using namespace ZL::MediaFile;
namespace ZL {
namespace Rtmp {
recursive_mutex RtmpMediaSource::g_mtxMediaSrc;
unordered_map<string, unordered_map<string,weak_ptr<RtmpMediaSource> > > RtmpMediaSource::g_mapMediaSrc;
RtmpMediaSource::Ptr RtmpMediaSource::find(const string &strApp, const string &strId, bool bMake) {
//查找某一媒体源,找到后返回
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
auto itApp = g_mapMediaSrc.find(strApp);
if (itApp == g_mapMediaSrc.end()) {
return bMake ? MediaReader::onMakeRtmp(strApp, strId) : nullptr;
}
auto itId = itApp->second.find(strId);
if (itId == itApp->second.end()) {
return bMake ? MediaReader::onMakeRtmp(strApp, strId) : nullptr;
}
auto ret = itId->second.lock();
if (ret) {
return ret;
}
itApp->second.erase(itId);
if (itApp->second.size() == 0) {
g_mapMediaSrc.erase(itApp);
}
return bMake ? MediaReader::onMakeRtmp(strApp, strId) : nullptr;
}
} /* namespace Rtmp */
} /* namespace ZL */

View File

@@ -36,6 +36,7 @@
#include "Rtmp.h"
#include "Common/config.h"
#include "Common/MediaSender.h"
#include "Common/MediaSource.h"
#include "Util/util.h"
#include "Util/logger.h"
#include "Util/RingBuffer.h"
@@ -47,72 +48,28 @@
using namespace std;
using namespace ZL::Util;
using namespace ZL::Thread;
using namespace ZL::Media;
namespace ZL {
namespace Rtmp {
class RtmpMediaSource: public enable_shared_from_this<RtmpMediaSource> {
class RtmpMediaSource: public MediaSource {
public:
typedef std::shared_ptr<RtmpMediaSource> Ptr;
typedef RingBuffer<RtmpPacket::Ptr> RingType;
RtmpMediaSource(const string &strApp, const string &strId) :
m_strApp(strApp),
m_strId(strId),
RtmpMediaSource(const string &vhost,const string &strApp, const string &strId) :
MediaSource(RTMP_SCHEMA,vhost,strApp,strId),
m_pRing(new RingBuffer<RtmpPacket::Ptr>()),
m_thPool( MediaSender::sendThread()) {
}
virtual ~RtmpMediaSource() {
unregist();
}
virtual ~RtmpMediaSource() {}
const RingType::Ptr &getRing() const {
//获取媒体源的rtp环形缓冲
return m_pRing;
}
virtual void regist() {
//注册该源注册后rtmp服务器才能找到该源
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
if (!g_mapMediaSrc[m_strApp].erase(m_strId)) {
InfoL << "Rtmp src:" << m_strApp << " " << m_strId;
}
g_mapMediaSrc[m_strApp].emplace(m_strId, shared_from_this());
NoticeCenter::Instance().emitEvent(Config::Broadcast::kBroadcastRtmpSrcRegisted,m_strApp.data(),m_strId.data());
}
virtual void unregist() {
//反注册该源
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
auto it = g_mapMediaSrc.find(m_strApp);
if (it == g_mapMediaSrc.end()) {
return;
}
if (it->second.erase(m_strId)) {
if (it->second.size() == 0) {
g_mapMediaSrc.erase(it);
}
InfoL << "Rtmp src:" << m_strApp << " " << m_strId;
}
}
static set<string> getMediaSet() {
set<string> ret;
lock_guard<recursive_mutex> lock(g_mtxMediaSrc);
for (auto &pr0 : g_mapMediaSrc) {
for (auto &pr1 : pr0.second) {
if (pr1.second.lock()) {
ret.emplace(pr0.first + "/" + pr1.first);
}
}
}
return ret;
}
static Ptr find(const string &strApp, const string &strId, bool bMake = true) ;
const string& getApp() const {
//获取该源的id
return m_strApp;
}
const string& getId() const {
//获取该源的id
return m_strId;
}
const AMFValue &getMetaData() const {
return m_metadata;
}
@@ -140,37 +97,12 @@ public:
_ring->write(pkt,pkt->isVideoKeyFrame());
});
}
bool seekTo(uint32_t ui32Stamp) {
if (!m_onSeek) {
return false;
}
return m_onSeek(ui32Stamp);
}
virtual void setOnSeek(const function<bool(uint32_t)> &cb) {
m_onSeek = cb;
}
uint32_t getStamp() {
if (!m_onStamp) {
return 0;
}
return m_onStamp();
}
virtual void setOnStamp(const function<uint32_t()> &cb) {
m_onStamp = cb;
}
protected:
function<bool(uint32_t)> m_onSeek;
function<uint32_t()> m_onStamp;
private:
AMFValue m_metadata;
unordered_map<int, RtmpPacket::Ptr> m_mapCfgFrame;
mutable recursive_mutex m_mtxMap;
string m_strApp; //媒体app
string m_strId; //媒体id
RingBuffer<RtmpPacket::Ptr>::Ptr m_pRing; //rtp环形缓冲
ThreadPool &m_thPool;
static unordered_map<string, unordered_map<string,weak_ptr<RtmpMediaSource> > > g_mapMediaSrc; //静态的媒体源表
static recursive_mutex g_mtxMediaSrc; ///访问静态的媒体源表的互斥锁
};
} /* namespace Rtmp */

View File

@@ -36,10 +36,10 @@ namespace ZL {
namespace Rtmp {
unordered_map<string, RtmpPusher::rtmpCMDHandle> RtmpPusher::g_mapCmd;
RtmpPusher::RtmpPusher(const char *strApp,const char *strStream) {
auto src = RtmpMediaSource::find(strApp,strStream);
RtmpPusher::RtmpPusher(const char *strVhost,const char *strApp,const char *strStream) {
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,strVhost,strApp,strStream));
if (!src) {
auto strErr = StrPrinter << "media source:" << strApp << "/" << strStream << "not found!" << endl;
auto strErr = StrPrinter << "media source:" << strVhost << "/" << strApp << "/" << strStream << "not found!" << endl;
throw std::runtime_error(strErr);
}
init(src);

View File

@@ -38,7 +38,7 @@ class RtmpPusher: public RtmpProtocol , public TcpClient{
public:
typedef std::shared_ptr<RtmpPusher> Ptr;
typedef std::function<void(const SockException &ex)> Event;
RtmpPusher(const char *strApp,const char *strStream);
RtmpPusher(const char *strVhost,const char *strApp,const char *strStream);
RtmpPusher(const RtmpMediaSource::Ptr &src);
virtual ~RtmpPusher();

View File

@@ -97,7 +97,12 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) {
///////////set peerBandwidth////////////////
sendPeerBandwidth(5000000);
m_strApp = params["app"].as_string();
m_mediaInfo.m_app = params["app"].as_string();
m_strTcUrl = params["tcUrl"].as_string();
if(m_strTcUrl.empty()){
//defaultVhost:默认vhost
m_strTcUrl = "rtmp://127.0.0.1/" + m_mediaInfo.m_app;
}
bool ok = true; //(app == APP_NAME);
AMFValue version(AMF_OBJECT);
version.set("fmsVer", "FMS/3,0,1,123");
@@ -109,7 +114,7 @@ void RtmpSession::onCmd_connect(AMFDecoder &dec) {
status.set("objectEncoding", amfVer);
sendReply(ok ? "_result" : "_error", version, status);
if (!ok) {
throw std::runtime_error("Unsupported application: " + m_strApp);
throw std::runtime_error("Unsupported application: " + m_mediaInfo.m_app);
}
AMFEncoder invoke;
@@ -123,12 +128,12 @@ void RtmpSession::onCmd_createStream(AMFDecoder &dec) {
void RtmpSession::onCmd_publish(AMFDecoder &dec) {
dec.load<AMFValue>();/* NULL */
m_strId = dec.load<std::string>();
auto iPos = m_strId.find('?');
if (iPos != string::npos) {
m_strId.erase(iPos);
}
auto src = RtmpMediaSource::find(m_strApp,m_strId,false);
m_mediaInfo.parse(m_strTcUrl + "/" + dec.load<std::string>());
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
m_mediaInfo.m_vhost,
m_mediaInfo.m_app,
m_mediaInfo.m_streamid,
false));
bool ok = (!src && !m_pPublisherSrc);
AMFValue status(AMF_OBJECT);
status.set("level", ok ? "status" : "error");
@@ -137,10 +142,13 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
if (!ok) {
throw std::runtime_error( StrPrinter << "Already publishing:" << m_strApp << "/" << m_strId << endl);
throw std::runtime_error( StrPrinter << "Already publishing:"
<< m_mediaInfo.m_vhost << " "
<< m_mediaInfo.m_app << " "
<< m_mediaInfo.m_streamid << endl);
}
m_bPublisherSrcRegisted = false;
m_pPublisherSrc.reset(new RtmpToRtspMediaSource(m_strApp,m_strId));
m_pPublisherSrc.reset(new RtmpToRtspMediaSource(m_mediaInfo.m_vhost,m_mediaInfo.m_app,m_mediaInfo.m_streamid));
}
void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
@@ -152,11 +160,15 @@ void RtmpSession::onCmd_deleteStream(AMFDecoder &dec) {
throw std::runtime_error(StrPrinter << "Stop publishing." << endl);
}
void RtmpSession::doPlay(){
auto src = RtmpMediaSource::find(m_strApp,m_strId,true);
void RtmpSession::doPlay(AMFDecoder &dec){
m_mediaInfo.parse(m_strTcUrl + "/" + dec.load<std::string>());
auto src = dynamic_pointer_cast<RtmpMediaSource>(MediaSource::find(RTMP_SCHEMA,
m_mediaInfo.m_vhost,
m_mediaInfo.m_app,
m_mediaInfo.m_streamid,
true));
bool ok = (src.operator bool());
ok = ok && src->ready();
//stream begin
sendUserControl(CONTROL_STREAM_BEGIN, STREAM_MEDIA);
@@ -165,11 +177,15 @@ void RtmpSession::doPlay(){
status.set("level", ok ? "status" : "error");
status.set("code", ok ? "NetStream.Play.Reset" : "NetStream.Play.StreamNotFound");
status.set("description", ok ? "Resetting and playing." : "No such stream.");
status.set("details", m_strId);
status.set("details", m_mediaInfo.m_streamid);
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
if (!ok) {
throw std::runtime_error( StrPrinter << "No such stream:" << m_strApp << " " << m_strId << endl);
throw std::runtime_error( StrPrinter << "No such stream:"
<< m_mediaInfo.m_vhost << " "
<< m_mediaInfo.m_app << " "
<< m_mediaInfo.m_streamid
<< endl);
}
// onStatus(NetStream.Play.Start)
@@ -177,7 +193,7 @@ void RtmpSession::doPlay(){
status.set("level", "status");
status.set("code", "NetStream.Play.Start");
status.set("description", "Started playing.");
status.set("details", m_strId);
status.set("details", m_mediaInfo.m_streamid);
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
@@ -198,7 +214,7 @@ void RtmpSession::doPlay(){
status.set("level", "status");
status.set("code", "NetStream.Play.PublishNotify");
status.set("description", "Now published.");
status.set("details", m_strId);
status.set("details", m_mediaInfo.m_streamid);
status.set("clientid", "0");
sendReply("onStatus", nullptr, status);
@@ -239,18 +255,19 @@ void RtmpSession::doPlay(){
if(src->getRing()->readerCount() == 1){
src->seekTo(0);
}
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaPlayed,
RTMP_SCHEMA,
m_mediaInfo.m_vhost.data(),
m_mediaInfo.m_app.data(),
m_mediaInfo.m_streamid.data());
}
void RtmpSession::onCmd_play2(AMFDecoder &dec) {
doPlay();
doPlay(dec);
}
void RtmpSession::onCmd_play(AMFDecoder &dec) {
dec.load<AMFValue>();/* NULL */
m_strId = dec.load<std::string>();
auto iPos = m_strId.find('?');
if (iPos != string::npos) {
m_strId.erase(iPos);
}
doPlay();
doPlay(dec);
}
void RtmpSession::onCmd_pause(AMFDecoder &dec) {

View File

@@ -53,8 +53,8 @@ public:
void onError(const SockException &err) override;
void onManager() override;
private:
std::string m_strApp;
std::string m_strId;
std::string m_strTcUrl;
MediaInfo m_mediaInfo;
double m_dNowReqID = 0;
Ticker m_ticker;//数据接收时间
typedef void (RtmpSession::*rtmpCMDHandle)(AMFDecoder &dec);
@@ -75,7 +75,7 @@ private:
void onCmd_play(AMFDecoder &dec);
void onCmd_play2(AMFDecoder &dec);
void doPlay();
void doPlay(AMFDecoder &dec);
void onCmd_seek(AMFDecoder &dec);
void onCmd_pause(AMFDecoder &dec);
void setMetaData(AMFDecoder &dec);

View File

@@ -36,25 +36,23 @@ using namespace ZL::Network;
namespace ZL {
namespace Rtmp {
#ifdef ENABLE_RTMP2RTSP
RtmpToRtspMediaSource::RtmpToRtspMediaSource(const string &_app, const string &_id) :
RtmpMediaSource(_app,_id) {
}
RtmpToRtspMediaSource::~RtmpToRtspMediaSource() {
RtmpToRtspMediaSource::RtmpToRtspMediaSource(const string &vhost,const string &app, const string &id) :
RtmpMediaSource(vhost,app,id) {
}
RtmpToRtspMediaSource::~RtmpToRtspMediaSource() {}
void RtmpToRtspMediaSource::regist() {
RtmpMediaSource::regist();
bool RtmpToRtspMediaSource::regist() {
if (m_pRtspSrc) {
m_pRtspSrc->regist();
}
return MediaSource::regist();
}
void RtmpToRtspMediaSource::unregist() {
RtmpMediaSource::unregist();
bool RtmpToRtspMediaSource::unregist() {
if(m_pRtspSrc){
m_pRtspSrc->unregist();
}
return MediaSource::unregist();
}
void RtmpToRtspMediaSource::onGetH264(const H264Frame &frame) {
@@ -156,14 +154,12 @@ void RtmpToRtspMediaSource::makeSDP() {
<< "\r\n" << endl;
}
m_pRtspSrc.reset(new RtspMediaSource(getApp(),getId()));
m_pRtspSrc->setOnSeek(m_onSeek);
m_pRtspSrc->setOnStamp(m_onStamp);
m_pRtspSrc.reset(new RtspMediaSource(getVhost(),getApp(),getId()));
m_pRtspSrc->setListener(m_listener);
m_pRtspSrc->onGetSDP(strSDP);
m_pRtspSrc->regist();
}
#endif // ENABLE_RTMP2RTSP
} /* namespace Rtmp */
} /* namespace ZL */

View File

@@ -52,19 +52,20 @@ using namespace ZL::MediaFile;
namespace ZL {
namespace Rtmp {
#ifdef ENABLE_RTMP2RTSP
class RtmpToRtspMediaSource: public RtmpMediaSource {
public:
typedef std::shared_ptr<RtmpToRtspMediaSource> Ptr;
RtmpToRtspMediaSource(const string &_app, const string &_id);
virtual ~RtmpToRtspMediaSource();
virtual void regist() override;
virtual void unregist() override;
virtual void onGetMetaData(const AMFValue &_metadata) override {
RtmpToRtspMediaSource(const string &vhost,const string &app, const string &id);
virtual ~RtmpToRtspMediaSource();
bool regist() override;
bool unregist() override;
void onGetMetaData(const AMFValue &_metadata) override {
try {
m_pParser.reset(new RtmpParser(_metadata));
m_pRecorder.reset(new MediaRecorder(getApp(),getId(),m_pParser));
m_pRecorder.reset(new MediaRecorder(getVhost(),getApp(),getId(),m_pParser));
m_pParser->setOnAudioCB(std::bind(&RtmpToRtspMediaSource::onGetAdts, this, placeholders::_1));
m_pParser->setOnVideoCB(std::bind(&RtmpToRtspMediaSource::onGetH264, this, placeholders::_1));
} catch (exception &ex) {
@@ -73,7 +74,7 @@ public:
RtmpMediaSource::onGetMetaData(_metadata);
}
virtual void onGetMedia(const RtmpPacket::Ptr &pkt) override {
void onGetMedia(const RtmpPacket::Ptr &pkt) override {
if (m_pParser) {
if (!m_pRtspSrc && m_pParser->isInited()) {
makeSDP();
@@ -82,18 +83,7 @@ public:
}
RtmpMediaSource::onGetMedia(pkt);
}
void setOnSeek(const function<bool(uint32_t)> &cb) override {
RtmpMediaSource::setOnSeek(cb);
if (m_pRtspSrc) {
m_pRtspSrc->setOnSeek(cb);
}
}
void setOnStamp(const function<uint32_t()> &cb) override{
RtmpMediaSource::setOnStamp(cb);
if (m_pRtspSrc) {
m_pRtspSrc->setOnStamp(cb);
}
}
private:
RtmpParser::Ptr m_pParser;
RtspMediaSource::Ptr m_pRtspSrc;
@@ -105,9 +95,6 @@ private:
void onGetAdts(const AdtsFrame &frame);
void makeSDP();
};
#else
typedef RtmpMediaSource RtmpToRtspMediaSource;
#endif //ENABLE_RTMP2RTSP
} /* namespace Rtmp */
} /* namespace ZL */