重写mp4录制驱动机制

This commit is contained in:
xiongziliang
2019-12-04 18:36:30 +08:00
parent d5a81d7105
commit b3fcb4c038
8 changed files with 384 additions and 52 deletions

View File

@@ -26,13 +26,6 @@
#include "Recorder.h"
#include "Common/config.h"
#include "Http/HttpSession.h"
#include "Util/util.h"
#include "Util/mini.h"
#include "Network/sockutil.h"
#include "HlsMakerImp.h"
#include "Player/PlayerBase.h"
#include "Common/MediaSink.h"
#include "MP4Recorder.h"
#include "HlsRecorder.h"
@@ -40,7 +33,7 @@ using namespace toolkit;
namespace mediakit {
MediaSinkInterface *Recorder::createHlsRecorder(const string &strVhost_tmp, const string &strApp, const string &strId) {
MediaSinkInterface *createHlsRecorder(const string &strVhost_tmp, const string &strApp, const string &strId) {
#if defined(ENABLE_HLS)
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
GET_CONFIG(string, hlsPath, Hls::kFilePath);
@@ -66,7 +59,7 @@ MediaSinkInterface *Recorder::createHlsRecorder(const string &strVhost_tmp, cons
#endif //defined(ENABLE_HLS)
}
MediaSinkInterface *Recorder::createMP4Recorder(const string &strVhost_tmp, const string &strApp, const string &strId) {
MediaSinkInterface *createMP4Recorder(const string &strVhost_tmp, const string &strApp, const string &strId) {
#if defined(ENABLE_MP4RECORD)
GET_CONFIG(bool, enableVhost, General::kEnableVhost);
GET_CONFIG(string, recordPath, Record::kFilePath);
@@ -91,5 +84,286 @@ MediaSinkInterface *Recorder::createMP4Recorder(const string &strVhost_tmp, cons
#endif //defined(ENABLE_MP4RECORD)
}
////////////////////////////////////////////////////////////////////////////////////////
class RecorderHelper {
public:
typedef std::shared_ptr<RecorderHelper> Ptr;
/**
* 构建函数
* @param bContinueRecord false表明hls录制从头开始录制(意味着hls临时文件在媒体反注册时会被删除)
*/
RecorderHelper(const MediaSinkInterface::Ptr &recorder, vector<Track::Ptr> &&tracks , bool bContinueRecord, const string &schema) {
_recorder = recorder;
_continueRecord = bContinueRecord;
_schema = schema;
attachTracks(std::move(tracks));
}
~RecorderHelper() {
resetTracks();
}
// 附则于track上
void attachTracks(vector<Track::Ptr> &&tracks){
if(isTracksSame(tracks)){
return;
}
resetTracks();
_tracks = std::move(tracks);
for (auto &track : _tracks) {
_recorder->addTrack(track);
track->addDelegate(_recorder);
}
}
// 判断新的tracks是否与之前的一致
bool isTracksSame(const vector<Track::Ptr> &tracks){
if(tracks.size() != _tracks.size()) {
return false;
}
int i = 0;
for(auto &track : tracks){
if(track != _tracks[i++]){
return false;
}
}
return true;
}
// 重置所有track
void resetTracks(){
if(_tracks.empty()){
return;
}
for (auto &track : _tracks) {
track->delDelegate(_recorder.get());
}
_tracks.clear();
_recorder->resetTracks();
}
// 返回false表明hls录制从头开始录制(意味着hls临时文件在媒体反注册时会被删除)
bool continueRecord(){
return _continueRecord;
}
bool isRecording() {
return !_tracks.empty();
}
const string &getSchema() const{
return _schema;
}
private:
MediaSinkInterface::Ptr _recorder;
vector<Track::Ptr> _tracks;
bool _continueRecord;
string _schema;
};
template<Recorder::type type>
class MediaSourceWatcher {
public:
static MediaSourceWatcher& Instance(){
static MediaSourceWatcher instance;
return instance;
}
Recorder::status getRecordStatus(const string &vhost, const string &app, const string &stream_id) {
return getRecordStatus_l(getRecorderKey(vhost, app, stream_id));
}
int startRecord(const string &vhost, const string &app, const string &stream_id, bool waitForRecord, bool continueRecord) {
auto key = getRecorderKey(vhost, app, stream_id);
lock_guard<decltype(_recorder_mtx)> lck(_recorder_mtx);
if (getRecordStatus_l(key) != Recorder::status_not_record) {
// 已经在录制了
return 0;
}
string schema;
auto tracks = findTracks(vhost, app, stream_id,schema);
if (!waitForRecord && tracks.empty()) {
// 暂时无法开启录制
return -1;
}
auto recorder = MediaSinkInterface::Ptr(createRecorder(vhost, app, stream_id));
if (!recorder) {
// 创建录制器失败
return -2;
}
_recorder_map[key] = std::make_shared<RecorderHelper>(recorder, std::move(tracks), continueRecord, schema);
return 0;
}
void stopRecord(const string &vhost, const string &app, const string &stream_id) {
lock_guard<decltype(_recorder_mtx)> lck(_recorder_mtx);
_recorder_map.erase(getRecorderKey(vhost, app, stream_id));
}
private:
MediaSourceWatcher(){
NoticeCenter::Instance().addListener(this,Broadcast::kBroadcastMediaChanged,[this](BroadcastMediaChangedArgs){
if(bRegist){
onRegist(schema,vhost,app,stream,sender);
}else{
onUnRegist(schema,vhost,app,stream,sender);
}
});
NoticeCenter::Instance().addListener(this,Broadcast::kBroadcastMediaResetTracks,[this](BroadcastMediaResetTracksArgs){
onRegist(schema,vhost,app,stream,sender);
});
}
~MediaSourceWatcher(){
NoticeCenter::Instance().delListener(this,Broadcast::kBroadcastMediaChanged);
NoticeCenter::Instance().delListener(this,Broadcast::kBroadcastMediaResetTracks);
}
void onRegist(const string &schema,const string &vhost,const string &app,const string &stream,MediaSource &sender){
auto key = getRecorderKey(vhost,app,stream);
lock_guard<decltype(_recorder_mtx)> lck(_recorder_mtx);
auto it = _recorder_map.find(key);
if(it == _recorder_map.end()){
// 录像记录不存在
return;
}
auto tracks = sender.getTracks(true);
if (tracks.empty()) {
// 无有效的tracks
return;
}
auto &helper = it->second;
if(!helper){
// 对象不存在,创建之
auto recorder = MediaSinkInterface::Ptr(createRecorder(vhost, app, stream));
if (recorder) {
_recorder_map[key] = std::make_shared<RecorderHelper>(recorder, std::move(tracks), false, schema);
}
return;
}
if(helper->getSchema() == schema){
// 对象存在且绑定的协议一致,替换tracks
helper->attachTracks(std::move(tracks));
}
}
void onUnRegist(const string &schema,const string &vhost,const string &app,const string &stream,MediaSource &sender){
auto key = getRecorderKey(vhost,app,stream);
lock_guard<decltype(_recorder_mtx)> lck(_recorder_mtx);
auto it = _recorder_map.find(key);
if(it == _recorder_map.end()){
// 录像记录不存在
return;
}
if(!it->second){
// 录像对象为空,已经停止录制
return;
}
if(it->second->continueRecord()){
// 如果可以继续录制那么只重置tracks,不删除对象
it->second->resetTracks();
}else{
// 删除对象(意味着可能删除hls临时文件)
it->second.reset();
}
}
Recorder::status getRecordStatus_l(const string &key) {
auto it = _recorder_map.find(key);
if (it == _recorder_map.end()) {
return Recorder::status_not_record;
}
if (it->second && it->second->isRecording()) {
return Recorder::status_recording;
}
return Recorder::status_wait_record;
}
// 查找MediaSource以便录制
vector<Track::Ptr> findTracks(const string &vhost, const string &app, const string &stream_id,string &schema) {
auto src = MediaSource::find(RTMP_SCHEMA, vhost, app, stream_id);
if (src) {
auto ret = src->getTracks(true);
if (!ret.empty()) {
schema = RTMP_SCHEMA;
return std::move(ret);
}
}
src = MediaSource::find(RTSP_SCHEMA, vhost, app, stream_id);
if (src) {
schema = RTSP_SCHEMA;
return src->getTracks(true);
}
return vector<Track::Ptr>();
}
string getRecorderKey(const string &vhost, const string &app, const string &stream_id) {
return vhost + "/" + app + "/" + stream_id;
}
MediaSinkInterface *createRecorder(const string &vhost, const string &app, const string &stream_id) {
MediaSinkInterface *ret = nullptr;
switch (type) {
case Recorder::type_hls:
ret = createHlsRecorder(vhost, app, stream_id);
break;
case Recorder::type_mp4:
ret = createMP4Recorder(vhost, app, stream_id);
break;
default:
break;
}
if(!ret){
WarnL << "can not recorder of: " << type;
}
return ret;
}
private:
recursive_mutex _recorder_mtx;
unordered_map<string, RecorderHelper::Ptr> _recorder_map;
};
Recorder::status Recorder::getRecordStatus(Recorder::type type, const string &vhost, const string &app, const string &stream_id) {
switch (type){
case type_mp4:
return MediaSourceWatcher<type_mp4>::Instance().getRecordStatus(vhost,app,stream_id);
case type_hls:
return MediaSourceWatcher<type_hls>::Instance().getRecordStatus(vhost,app,stream_id);
}
return status_not_record;
}
int Recorder::startRecord(Recorder::type type, const string &vhost, const string &app, const string &stream_id, bool waitForRecord, bool continueRecord) {
switch (type){
case type_mp4:
return MediaSourceWatcher<type_mp4>::Instance().startRecord(vhost,app,stream_id,waitForRecord,continueRecord);
case type_hls:
return MediaSourceWatcher<type_hls>::Instance().startRecord(vhost,app,stream_id,waitForRecord,continueRecord);
}
return -3;
}
void Recorder::stopRecord(Recorder::type type, const string &vhost, const string &app, const string &stream_id) {
switch (type){
case type_mp4:
return MediaSourceWatcher<type_mp4>::Instance().stopRecord(vhost,app,stream_id);
case type_hls:
return MediaSourceWatcher<type_hls>::Instance().stopRecord(vhost,app,stream_id);
}
}
} /* namespace mediakit */