Media tuple refactor (#3715)

Co-authored-by: cqm <cqm@97kid.com>
This commit is contained in:
mtdxc
2024-07-14 09:32:41 +08:00
committed by GitHub
parent 0eb38635ce
commit d735aa1797
18 changed files with 61 additions and 59 deletions

View File

@@ -595,7 +595,8 @@ MediaSource::Ptr MediaSource::createFromMP4(const string &schema, const string &
}
#ifdef ENABLE_MP4
try {
auto reader = std::make_shared<MP4Reader>(vhost, app, stream, file_path);
MediaTuple tuple = {vhost, app, stream, ""};
auto reader = std::make_shared<MP4Reader>(tuple, file_path);
reader->startReadMP4();
return MediaSource::find(schema, vhost, app, stream);
} catch (std::exception &ex) {

View File

@@ -24,13 +24,9 @@ using namespace std;
namespace mediakit {
PlayerProxy::PlayerProxy(
const string &vhost, const string &app, const string &stream_id, const ProtocolOption &option, int retry_count,
const MediaTuple &tuple, const ProtocolOption &option, int retry_count,
const EventPoller::Ptr &poller, int reconnect_delay_min, int reconnect_delay_max, int reconnect_delay_step)
: MediaPlayer(poller)
, _option(option) {
_tuple.vhost = vhost;
_tuple.app = app;
_tuple.stream = stream_id;
: MediaPlayer(poller), _tuple(tuple), _option(option) {
_retry_count = retry_count;
setOnClose(nullptr);

View File

@@ -66,9 +66,9 @@ public:
// 如果retry_count<0,则一直重试播放否则重试retry_count次数
// 默认一直重试
PlayerProxy(
const std::string &vhost, const std::string &app, const std::string &stream_id, const ProtocolOption &option, int retry_count = -1,
const toolkit::EventPoller::Ptr &poller = nullptr, int reconnect_delay_min = 2, int reconnect_delay_max = 60, int reconnect_delay_step = 3);
PlayerProxy(const MediaTuple &tuple, const ProtocolOption &option, int retry_count = -1,
const toolkit::EventPoller::Ptr &poller = nullptr,
int reconnect_delay_min = 2, int reconnect_delay_max = 60, int reconnect_delay_step = 3);
~PlayerProxy() override;

View File

@@ -20,7 +20,7 @@ using namespace toolkit;
namespace mediakit {
MP4Reader::MP4Reader(const std::string &vhost, const std::string &app, const std::string &stream_id, const string &file_path,
MP4Reader::MP4Reader(const MediaTuple &tuple, const string &file_path,
toolkit::EventPoller::Ptr poller) {
ProtocolOption option;
// 读取mp4文件并流化时不重复生成mp4/hls文件
@@ -29,16 +29,15 @@ MP4Reader::MP4Reader(const std::string &vhost, const std::string &app, const std
option.enable_hls_fmp4 = false;
// mp4支持多track
option.max_track = 16;
setup(vhost, app, stream_id, file_path, option, std::move(poller));
setup(tuple, file_path, option, std::move(poller));
}
MP4Reader::MP4Reader(const std::string &vhost, const std::string &app, const std::string &stream_id, const string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller) {
setup(vhost, app, stream_id, file_path, option, std::move(poller));
MP4Reader::MP4Reader(const MediaTuple &tuple, const string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller) {
setup(tuple, file_path, option, std::move(poller));
}
void MP4Reader::setup(const std::string &vhost, const std::string &app, const std::string &stream_id, const std::string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller) {
void MP4Reader::setup(const MediaTuple &tuple, const std::string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller) {
//读写文件建议放在后台线程
auto tuple = MediaTuple{vhost, app, stream_id, ""};
_poller = poller ? std::move(poller) : WorkThreadPool::Instance().getPoller();
_file_path = file_path;
if (_file_path.empty()) {

View File

@@ -28,11 +28,9 @@ public:
* @param stream_id 流id,置空时,只解复用mp4,但是不生成MediaSource
* @param file_path 文件路径,如果为空则根据配置文件和上面参数自动生成,否则使用指定的文件
*/
MP4Reader(const std::string &vhost, const std::string &app, const std::string &stream_id,
const std::string &file_path = "", toolkit::EventPoller::Ptr poller = nullptr);
MP4Reader(const MediaTuple &tuple, const std::string &file_path = "", toolkit::EventPoller::Ptr poller = nullptr);
MP4Reader(const std::string &vhost, const std::string &app, const std::string &stream_id,
const std::string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller = nullptr);
MP4Reader(const MediaTuple &tuple, const std::string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller = nullptr);
/**
* 开始解复用MP4文件
@@ -69,7 +67,7 @@ private:
void setCurrentStamp(uint32_t stamp);
bool seekTo(uint32_t stamp_seek);
void setup(const std::string &vhost, const std::string &app, const std::string &stream_id, const std::string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller);
void setup(const MediaTuple &tuple, const std::string &file_path, const ProtocolOption &option, toolkit::EventPoller::Ptr poller);
private:
bool _file_repeat = false;

View File

@@ -99,10 +99,10 @@ RtpMultiCaster::~RtpMultiCaster() {
DebugL;
}
RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port) {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA, vhost, app, stream));
RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, const MediaTuple &tuple, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port) {
auto src = dynamic_pointer_cast<RtspMediaSource>(MediaSource::find(RTSP_SCHEMA, tuple.vhost, tuple.app, tuple.stream));
if (!src) {
auto err = StrPrinter << "未找到媒体源:" << vhost << " " << app << " " << stream << endl;
auto err = StrPrinter << "未找到媒体源:" << tuple.shortUrl() << endl;
throw std::runtime_error(err);
}
_multicast_ip = (multicast_ip) ? make_shared<uint32_t>(multicast_ip) : MultiCastAddressMaker::Instance().obtain();
@@ -144,7 +144,7 @@ RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, con
});
});
string strKey = StrPrinter << local_ip << " " << vhost << " " << app << " " << stream << endl;
string strKey = StrPrinter << local_ip << " " << tuple.vhost << " " << tuple.app << " " << tuple.stream << endl;
_rtp_reader->setDetachCB([this, strKey]() {
{
lock_guard<recursive_mutex> lck(g_mtx);
@@ -167,7 +167,7 @@ RtpMultiCaster::RtpMultiCaster(SocketHelper &helper, const string &local_ip, con
DebugL << MultiCastAddressMaker::toString(*_multicast_ip) << " "
<< _udp_sock[0]->get_local_port() << " "
<< _udp_sock[1]->get_local_port() << " "
<< vhost << " " << app << " " << stream;
<< tuple.shortUrl();
}
uint16_t RtpMultiCaster::getMultiCasterPort(TrackType trackType) {
@@ -180,17 +180,17 @@ string RtpMultiCaster::getMultiCasterIP() {
return SockUtil::inet_ntoa(addr);
}
RtpMultiCaster::Ptr RtpMultiCaster::get(SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port) {
static auto on_create = [](SocketHelper &helper, const string &local_ip, const string &vhost, const string &app, const string &stream, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port){
RtpMultiCaster::Ptr RtpMultiCaster::get(SocketHelper &helper, const string &local_ip, const MediaTuple &tuple, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port) {
static auto on_create = [](SocketHelper &helper, const string &local_ip, const MediaTuple &tuple, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port){
try {
auto poller = helper.getPoller();
auto ret = RtpMultiCaster::Ptr(new RtpMultiCaster(helper, local_ip, vhost, app, stream, multicast_ip, video_port, audio_port), [poller](RtpMultiCaster *ptr) {
auto ret = RtpMultiCaster::Ptr(new RtpMultiCaster(helper, local_ip, tuple, multicast_ip, video_port, audio_port), [poller](RtpMultiCaster *ptr) {
poller->async([ptr]() {
delete ptr;
});
});
lock_guard<recursive_mutex> lck(g_mtx);
string strKey = StrPrinter << local_ip << " " << vhost << " " << app << " " << stream << endl;
string strKey = StrPrinter << local_ip << " " << tuple.vhost << " " << tuple.app << " " << tuple.stream << endl;
g_multi_caster_map.emplace(strKey, ret);
return ret;
} catch (std::exception &ex) {
@@ -199,16 +199,16 @@ RtpMultiCaster::Ptr RtpMultiCaster::get(SocketHelper &helper, const string &loca
}
};
string strKey = StrPrinter << local_ip << " " << vhost << " " << app << " " << stream << endl;
string strKey = StrPrinter << local_ip << " " << tuple.vhost << " " << tuple.app << " " << tuple.stream << endl;
lock_guard<recursive_mutex> lck(g_mtx);
auto it = g_multi_caster_map.find(strKey);
if (it == g_multi_caster_map.end()) {
return on_create(helper, local_ip, vhost, app, stream, multicast_ip, video_port, audio_port);
return on_create(helper, local_ip, tuple, multicast_ip, video_port, audio_port);
}
auto ret = it->second.lock();
if (!ret) {
g_multi_caster_map.erase(it);
return on_create(helper, local_ip, vhost, app, stream, multicast_ip, video_port, audio_port);
return on_create(helper, local_ip, tuple, multicast_ip, video_port, audio_port);
}
return ret;
}

View File

@@ -45,14 +45,14 @@ public:
~RtpMultiCaster();
static Ptr get(toolkit::SocketHelper &helper, const std::string &local_ip, const std::string &vhost, const std::string &app, const std::string &stream, uint32_t multicast_ip = 0, uint16_t video_port = 0, uint16_t audio_port = 0);
static Ptr get(toolkit::SocketHelper &helper, const std::string &local_ip, const MediaTuple &tuple, uint32_t multicast_ip = 0, uint16_t video_port = 0, uint16_t audio_port = 0);
void setDetachCB(void *listener,const onDetach &cb);
std::string getMultiCasterIP();
uint16_t getMultiCasterPort(TrackType trackType);
private:
RtpMultiCaster(toolkit::SocketHelper &helper, const std::string &local_ip, const std::string &vhost, const std::string &app, const std::string &stream, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port);
RtpMultiCaster(toolkit::SocketHelper &helper, const std::string &local_ip, const MediaTuple &tuple, uint32_t multicast_ip, uint16_t video_port, uint16_t audio_port);
private:
std::recursive_mutex _mtx;

View File

@@ -742,7 +742,7 @@ void RtspSession::handleReq_Setup(const Parser &parser) {
break;
case Rtsp::RTP_MULTICAST: {
if(!_multicaster){
_multicaster = RtpMultiCaster::get(*this, get_local_ip(), _media_info.vhost, _media_info.app, _media_info.stream, _multicast_ip, _multicast_video_port, _multicast_audio_port);
_multicaster = RtpMultiCaster::get(*this, get_local_ip(), _media_info, _multicast_ip, _multicast_video_port, _multicast_audio_port);
if (!_multicaster) {
send_NotAcceptable();
throw SockException(Err_shutdown, "can not get a available udp multicast socket");