简化代码:

- MediaSource引入shortUrl和getUrl来简化日志输出
- WebApi引入fillSockInfo
This commit is contained in:
cqm
2022-09-07 11:06:39 +08:00
parent 4f47b7a5fe
commit 999e0b274e
19 changed files with 114 additions and 165 deletions

View File

@@ -13,6 +13,7 @@
#include "Util/util.h"
#include "Network/sockutil.h"
#include "Network/TcpSession.h"
#include "Util/NoticeCenter.h"
using namespace std;
using namespace toolkit;
@@ -24,7 +25,11 @@ namespace toolkit {
namespace mediakit {
static recursive_mutex s_media_source_mtx;
static MediaSource::SchemaVhostAppStreamMap s_media_source_map;
using StreamMap = unordered_map<string/*strema_id*/, weak_ptr<MediaSource> >;
using AppStreamMap = unordered_map<string/*app*/, StreamMap>;
using VhostAppStreamMap = unordered_map<string/*vhost*/, AppStreamMap>;
using SchemaVhostAppStreamMap = unordered_map<string/*schema*/, VhostAppStreamMap>;
static SchemaVhostAppStreamMap s_media_source_map;
string getOriginTypeString(MediaOriginType type){
#define SWITCH_CASE(type) case MediaOriginType::type : return #type
@@ -43,10 +48,6 @@ string getOriginTypeString(MediaOriginType type){
}
}
static string getOriginUrl_l(const MediaSource *thiz) {
return thiz->getSchema() + "://" + thiz->getVhost() + "/" + thiz->getApp() + "/" + thiz->getId();
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////
struct MediaSourceNull : public MediaSource {
MediaSourceNull() : MediaSource("schema", "vhost", "app", "stream") {};
@@ -109,16 +110,12 @@ std::shared_ptr<void> MediaSource::getOwnership() {
}
int MediaSource::getBytesSpeed(TrackType type){
if(type == TrackInvalid){
if(type == TrackInvalid || type == TrackMax){
return _speed[TrackVideo].getSpeed() + _speed[TrackAudio].getSpeed();
}
return _speed[type].getSpeed();
}
uint64_t MediaSource::getCreateStamp() const {
return _create_stamp;
}
uint64_t MediaSource::getAliveSecond() const {
//使用Ticker对象获取存活时间的目的是防止修改系统时间导致回退
return _ticker.createdTime() / 1000;
@@ -140,6 +137,7 @@ std::weak_ptr<MediaSourceEvent> MediaSource::getListener(bool next) const{
if (!next) {
return _listener;
}
auto listener = dynamic_pointer_cast<MediaSourceEventInterceptor>(_listener.lock());
if (!listener) {
//不是MediaSourceEventInterceptor对象或者对象已经销毁
@@ -170,13 +168,13 @@ MediaOriginType MediaSource::getOriginType() const {
string MediaSource::getOriginUrl() const {
auto listener = _listener.lock();
if (!listener) {
return getOriginUrl_l(this);
return getUrl();
}
auto ret = listener->getOriginUrl(const_cast<MediaSource &>(*this));
if (!ret.empty()) {
return ret;
}
return getOriginUrl_l(this);
return getUrl();
}
std::shared_ptr<SockInfo> MediaSource::getOriginSock() const {
@@ -253,7 +251,7 @@ void MediaSource::onReaderChanged(int size) {
bool MediaSource::setupRecord(Recorder::type type, bool start, const string &custom_path, size_t max_second){
auto listener = _listener.lock();
if (!listener) {
WarnL << "未设置MediaSource的事件监听者setupRecord失败:" << getSchema() << "/" << getVhost() << "/" << getApp() << "/" << getId();
WarnL << "未设置MediaSource的事件监听者setupRecord失败:" << getUrl();
return false;
}
return listener->setupRecord(*this, type, start, custom_path, max_second);
@@ -337,7 +335,7 @@ void MediaSource::for_each_media(const function<void(const Ptr &src)> &cb,
static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, const string &app, const string &id, bool from_mp4) {
string vhost = vhost_in;
GET_CONFIG(bool,enableVhost,General::kEnableVhost);
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
if(vhost.empty() || !enableVhost){
vhost = DEFAULT_VHOST;
}
@@ -351,7 +349,7 @@ static MediaSource::Ptr find_l(const string &schema, const string &vhost_in, con
MediaSource::for_each_media([&](const MediaSource::Ptr &src) { ret = std::move(const_cast<MediaSource::Ptr &>(src)); }, schema, vhost, app, id);
if(!ret && from_mp4 && schema != HLS_SCHEMA){
//未找媒体源则读取mp4创建一个
//未找媒体源则读取mp4创建一个
//播放hls不触发mp4点播(因为HLS也可以用于录像不是纯粹的直播)
ret = MediaSource::createFromMP4(schema, vhost, app, id);
}
@@ -379,7 +377,7 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr<Session> &s
};
auto on_timeout = poller->doDelayTask(maxWaitMS, [cb_once, listener_tag]() {
//最多等待一定时间,如这个时间内,流未注册上,那么返回未找到流
// 最多等待一定时间,如这个时间内,流未注册上,则返回空
NoticeCenter::Instance().delListener(listener_tag, Broadcast::kBroadcastMediaChanged);
cb_once(nullptr);
return 0;
@@ -402,17 +400,15 @@ static void findAsync_l(const MediaInfo &info, const std::shared_ptr<Session> &s
//不是自己感兴趣的事件,忽略之
return;
}
poller->async([weak_session, cancel_all, info, cb_once]() {
cancel_all();
auto strong_session = weak_session.lock();
if (!strong_session) {
//自己已经销毁
return;
if (auto strong_session = weak_session.lock()) {
//播发器请求的流终于注册上了,切换到自己的线程再回复
DebugL << "收到媒体注册事件,回复播放器:" << info.getUrl();
//再找一遍媒体源,一般能找到
findAsync_l(info, strong_session, false, cb_once);
}
//播发器请求的流终于注册上了,切换到自己的线程再回复
DebugL << "收到媒体注册事件,回复播放器:" << info._schema << "/" << info._vhost << "/" << info._app << "/" << info._streamid;
//再找一遍媒体源,一般能找到
findAsync_l(info, strong_session, false, cb_once);
}, false);
};
@@ -458,7 +454,7 @@ void MediaSource::emitEvent(bool regist){
}
//触发广播
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastMediaChanged, regist, *this);
InfoL << (regist ? "媒体注册:" : "媒体注销:") << _schema << " " << _vhost << " " << _app << " " << _stream_id;
InfoL << (regist ? "媒体注册:" : "媒体注销:") << getUrl();
}
void MediaSource::regist() {
@@ -472,7 +468,7 @@ void MediaSource::regist() {
return;
}
//增加判断, 防止当前流已注册时再次注册
throw std::invalid_argument("media source already existed:" + _schema + "/" + _vhost + "/" + _app + "/" + _stream_id);
throw std::invalid_argument("media source already existed:" + getUrl());
}
ref = shared_from_this();
}
@@ -519,9 +515,9 @@ bool MediaSource::unregist() {
/////////////////////////////////////MediaInfo//////////////////////////////////////
void MediaInfo::parse(const string &url_in){
void MediaInfo::parse(const std::string &url_in){
_full_url = url_in;
string url = url_in;
auto url = url_in;
auto pos = url.find("?");
if (pos != string::npos) {
_param_strs = url.substr(pos + 1);
@@ -621,11 +617,7 @@ void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastStreamNoneReader, *strong_sender);
} else {
//这个是mp4点播我们自动关闭
WarnL << "MP4点播无人观看,自动关闭:"
<< strong_sender->getSchema() << "/"
<< strong_sender->getVhost() << "/"
<< strong_sender->getApp() << "/"
<< strong_sender->getId();
WarnL << "MP4点播无人观看,自动关闭:" << strong_sender->getUrl();
strong_sender->close(false);
}
return false;
@@ -633,7 +625,7 @@ void MediaSourceEvent::onReaderChanged(MediaSource &sender, int size){
}
string MediaSourceEvent::getOriginUrl(MediaSource &sender) const {
return getOriginUrl_l(&sender);
return sender.getUrl();
}
MediaOriginType MediaSourceEventInterceptor::getOriginType(MediaSource &sender) const {

View File

@@ -19,13 +19,8 @@
#include <unordered_map>
#include "Common/config.h"
#include "Common/Parser.h"
#include "Util/logger.h"
#include "Util/TimeTicker.h"
#include "Util/NoticeCenter.h"
#include "Util/List.h"
#include "Network/Socket.h"
#include "Rtsp/Rtsp.h"
#include "Rtmp/Rtmp.h"
#include "Extension/Track.h"
#include "Record/Recorder.h"
@@ -85,7 +80,7 @@ public:
// 通知观看人数变化
virtual void onReaderChanged(MediaSource &sender, int size);
//流注册或注销事件
virtual void onRegist(MediaSource &sender, bool regist) {};
virtual void onRegist(MediaSource &sender, bool regist) {}
// 获取丢包率
virtual float getLossRate(MediaSource &sender, TrackType type) { return -1; }
// 获取所在线程, 此函数一般强制重载
@@ -95,7 +90,7 @@ public:
// 开启或关闭录制
virtual bool setupRecord(MediaSource &sender, Recorder::type type, bool start, const std::string &custom_path, size_t max_second) { return false; };
// 获取录制状态
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; };
virtual bool isRecording(MediaSource &sender, Recorder::type type) { return false; }
// 获取所有track相关信息
virtual std::vector<Track::Ptr> getMediaTracks(MediaSource &sender, bool trackReady = true) const { return std::vector<Track::Ptr>(); };
@@ -180,7 +175,12 @@ public:
MediaInfo() {}
MediaInfo(const std::string &url) { parse(url); }
void parse(const std::string &url);
std::string shortUrl() const {
return _vhost + "/" + _app + "/" + _streamid;
}
std::string getUrl() const {
return _schema + "://" + shortUrl();
}
public:
std::string _full_url;
std::string _schema;
@@ -199,10 +199,6 @@ class MediaSource: public TrackSource, public std::enable_shared_from_this<Media
public:
static MediaSource& NullMediaSource();
using Ptr = std::shared_ptr<MediaSource>;
using StreamMap = std::unordered_map<std::string/*stream_id*/, std::weak_ptr<MediaSource> >;
using AppStreamMap = std::unordered_map<std::string/*app*/, StreamMap>;
using VhostAppStreamMap = std::unordered_map<std::string/*vhost*/, AppStreamMap>;
using SchemaVhostAppStreamMap = std::unordered_map<std::string/*schema*/, VhostAppStreamMap>;
MediaSource(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &stream_id) ;
virtual ~MediaSource();
@@ -218,6 +214,13 @@ public:
// 流id
const std::string& getId() const;
std::string shortUrl() const {
return _vhost + "/" + _app + "/" + _stream_id;
}
std::string getUrl() const {
return _schema + "://" + shortUrl();
}
//获取对象所有权
std::shared_ptr<void> getOwnership();
@@ -232,7 +235,7 @@ public:
// 获取数据速率单位bytes/s
int getBytesSpeed(TrackType type = TrackInvalid);
// 获取流创建GMT unix时间戳单位秒
uint64_t getCreateStamp() const;
uint64_t getCreateStamp() const {return _create_stamp;}
// 获取流上线时间,单位秒
uint64_t getAliveSecond() const;
@@ -288,8 +291,11 @@ public:
// 同步查找流
static Ptr find(const std::string &schema, const std::string &vhost, const std::string &app, const std::string &id, bool from_mp4 = false);
static Ptr find(const MediaInfo &info, bool from_mp4 = false) {
return find(info._schema, info._vhost, info._app, info._streamid, from_mp4);
}
// 忽略类型同步查找流可能返回rtmp/rtsp/hls类型
// 忽略schema同步查找流可能返回rtmp/rtsp/hls类型
static Ptr find(const std::string &vhost, const std::string &app, const std::string &stream_id, bool from_mp4 = false);
// 异步查找流
@@ -335,6 +341,7 @@ public:
bool isFlushAble(bool is_video, bool is_key, uint64_t new_stamp, size_t cache_size);
private:
// 音视频的最后时间戳
uint64_t _last_stamp[2] = {0, 0};
};

View File

@@ -103,9 +103,7 @@ void HttpSession::onError(const SockException& err) {
//flv/ts播放器
uint64_t duration = _ticker.createdTime() / 1000;
WarnP(this) << "FLV/TS/FMP4播放器("
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
<< _mediaInfo.shortUrl()
<< ")断开:" << err.what()
<< ",耗时(s):" << duration;

View File

@@ -160,7 +160,7 @@ bool PlayerProxy::close(MediaSource &sender, bool force) {
strongSelf->teardown();
});
_on_close(SockException(Err_shutdown, "closed by user"));
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
WarnL << sender.getUrl() << " " << force;
return true;
}

View File

@@ -40,8 +40,7 @@ HlsCookieData::~HlsCookieData() {
if (*_added) {
uint64_t duration = (_ticker.createdTime() - _ticker.elapsedTime()) / 1000;
WarnL << _sock_info->getIdentifier() << "(" << _sock_info->get_peer_ip() << ":" << _sock_info->get_peer_port()
<< ") " << "HLS播放器(" << _info._vhost << "/" << _info._app << "/" << _info._streamid
<< ")断开,耗时(s):" << duration;
<< ") " << "HLS播放器(" << _info.shortUrl() << ")断开,耗时(s):" << duration;
GET_CONFIG(uint32_t, iFlowThreshold, General::kFlowThreshold);
uint64_t bytes = _bytes.load();

View File

@@ -235,7 +235,7 @@ bool MP4Reader::close(MediaSource &sender, bool force) {
return false;
}
_timer.reset();
WarnL << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
WarnL << sender.getUrl() << " " << force;
return true;
}

View File

@@ -63,6 +63,7 @@ void MP4Recorder::createFile() {
WarnL << ex.what();
}
}
void MP4Recorder::asyncClose() {
auto muxer = _muxer;
auto full_path_tmp = _full_path_tmp;

View File

@@ -31,9 +31,7 @@ void RtmpSession::onError(const SockException& err) {
bool is_player = !_push_src_ownership;
uint64_t duration = _ticker.createdTime() / 1000;
WarnP(this) << (is_player ? "RTMP播放器(" : "RTMP推流器(")
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< _media_info.shortUrl()
<< ")断开:" << err.what()
<< ",耗时(s):" << duration;
@@ -256,10 +254,7 @@ void RtmpSession::sendPlayResponse(const string &err, const RtmpMediaSource::Ptr
"clientid", "0" });
if (!ok) {
string err_msg = StrPrinter << (auth_success ? "no such stream:" : err.data()) << " "
<< _media_info._vhost << " "
<< _media_info._app << " "
<< _media_info._streamid;
string err_msg = StrPrinter << (auth_success ? "no such stream:" : err.data()) << " " << _media_info.shortUrl();
shutdown(SockException(Err_shutdown, err_msg));
return;
}
@@ -579,7 +574,7 @@ bool RtmpSession::close(MediaSource &sender,bool force) {
if(!_push_src || (!force && _push_src->totalReaderCount())){
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
safeShutdown(SockException(Err_shutdown,err));
return true;
}
@@ -619,6 +614,6 @@ void RtmpSession::dumpMetadata(const AMFValue &metadata) {
metadata.object_for_each([&](const string &key, const AMFValue &val) {
printer << "\r\n" << key << "\t:" << val.to_string();
});
InfoL << _media_info._vhost << " " << _media_info._app << " " << _media_info._streamid << (string) printer;
InfoL << _media_info.shortUrl() << (string) printer;
}
} /* namespace mediakit */

View File

@@ -52,9 +52,7 @@ RtpProcess::RtpProcess(const string &stream_id) {
RtpProcess::~RtpProcess() {
uint64_t duration = (_last_frame_time.createdTime() - _last_frame_time.elapsedTime()) / 1000;
WarnP(this) << "RTP推流器("
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< _media_info.shortUrl()
<< ")断开,耗时(s):" << duration;
//流量统计事件广播
@@ -272,7 +270,7 @@ MediaOriginType RtpProcess::getOriginType(MediaSource &sender) const{
}
string RtpProcess::getOriginUrl(MediaSource &sender) const {
return _media_info._schema + "://" + _media_info._vhost + "/" + _media_info._app + "/" + _media_info._streamid;
return _media_info.getUrl();
}
std::shared_ptr<SockInfo> RtpProcess::getOriginSock(MediaSource &sender) const {

View File

@@ -144,7 +144,7 @@ bool RtpProcessHelper::close(MediaSource &sender, bool force) {
return false;
}
parent->delProcess(_stream_id, _process.get());
WarnL << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
WarnL << "close media:" << sender.getUrl() << " " << force;
return true;
}

View File

@@ -129,7 +129,7 @@ bool RtpSession::close(MediaSource &sender, bool force) {
if(!_process || (!force && static_pointer_cast<MediaSourceEvent>(_process)->totalReaderCount(sender))){
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
safeShutdown(SockException(Err_shutdown,err));
return true;
}

View File

@@ -63,9 +63,7 @@ void RtspSession::onError(const SockException &err) {
bool is_player = !_push_src_ownership;
uint64_t duration = _alive_ticker.createdTime() / 1000;
WarnP(this) << (is_player ? "RTSP播放器(" : "RTSP推流器(")
<< _media_info._vhost << "/"
<< _media_info._app << "/"
<< _media_info._streamid
<< _media_info.shortUrl()
<< ")断开:" << err.what()
<< ",耗时(s):" << duration;
@@ -249,9 +247,7 @@ void RtspSession::handleReq_ANNOUNCE(const Parser &parser) {
if (push_failed) {
sendRtspResponse("406 Not Acceptable", { "Content-Type", "text/plain" }, "Already publishing.");
string err = StrPrinter << "ANNOUNCE:"
<< "Already publishing:" << _media_info._vhost << " " << _media_info._app << " "
<< _media_info._streamid << endl;
string err = StrPrinter << "ANNOUNCE: Already publishing:" << _media_info.shortUrl() << endl;
throw SockException(Err_shutdown, err);
}
@@ -417,7 +413,7 @@ void RtspSession::onAuthSuccess() {
auto rtsp_src = dynamic_pointer_cast<RtspMediaSource>(src);
if (!rtsp_src) {
//未找到相应的MediaSource
string err = StrPrinter << "no such stream:" << strong_self->_media_info._vhost << " " << strong_self->_media_info._app << " " << strong_self->_media_info._streamid;
string err = StrPrinter << "no such stream:" << strong_self->_media_info.shortUrl();
strong_self->send_StreamNotFound();
strong_self->shutdown(SockException(Err_shutdown,err));
return;
@@ -1134,7 +1130,7 @@ bool RtspSession::close(MediaSource &sender, bool force) {
if(!_push_src || (!force && _push_src->totalReaderCount())){
return false;
}
string err = StrPrinter << "close media:" << sender.getSchema() << "/" << sender.getVhost() << "/" << sender.getApp() << "/" << sender.getId() << " " << force;
string err = StrPrinter << "close media:" << sender.getUrl() << " " << force;
safeShutdown(SockException(Err_shutdown,err));
return true;
}

View File

@@ -23,12 +23,7 @@ public:
MediaSource::for_each_media([&](const MediaSource::Ptr &media) {
if (ini.find("list") != ini.end()) {
//列出源
(*stream) << "\t"
<< media->getSchema() << "/"
<< media->getVhost() << "/"
<< media->getApp() << "/"
<< media->getId()
<< "\r\n";
(*stream) << "\t" << media->getUrl() << "\r\n";
return;
}
@@ -42,20 +37,10 @@ public:
if (!media->close(true)) {
break;
}
(*stream) << "\t踢出成功:"
<< media->getSchema() << "/"
<< media->getVhost() << "/"
<< media->getApp() << "/"
<< media->getId()
<< "\r\n";
(*stream) << "\t踢出成功:" << media->getUrl() << "\r\n";
return;
} while (0);
(*stream) << "\t踢出失败:"
<< media->getSchema() << "/"
<< media->getVhost() << "/"
<< media->getApp() << "/"
<< media->getId()
<< "\r\n";
(*stream) << "\t踢出失败:" << media->getUrl() << "\r\n";
}
}, false);