mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2026-06-30 06:42:22 +08:00
Merge remote-tracking branch 'upstream/master' into master
This commit is contained in:
@@ -43,6 +43,7 @@ MediaSource::MediaSource(const string &schema, const string &vhost, const string
|
||||
_schema = schema;
|
||||
_app = app;
|
||||
_stream_id = stream_id;
|
||||
_create_stamp = time(NULL);
|
||||
}
|
||||
|
||||
MediaSource::~MediaSource() {
|
||||
@@ -66,6 +67,19 @@ const string& MediaSource::getId() const {
|
||||
return _stream_id;
|
||||
}
|
||||
|
||||
int MediaSource::getBytesSpeed(){
|
||||
return _speed.getSpeed();
|
||||
}
|
||||
|
||||
uint64_t MediaSource::getCreateStamp() const {
|
||||
return _create_stamp;
|
||||
}
|
||||
|
||||
uint64_t MediaSource::getAliveSecond() const {
|
||||
//使用Ticker对象获取存活时间的目的是防止修改系统时间导致回退
|
||||
return _ticker.createdTime() / 1000;
|
||||
}
|
||||
|
||||
vector<Track::Ptr> MediaSource::getTracks(bool ready) const {
|
||||
auto listener = _listener.lock();
|
||||
if(!listener){
|
||||
|
||||
@@ -137,6 +137,52 @@ public:
|
||||
string _param_strs;
|
||||
};
|
||||
|
||||
class BytesSpeed {
|
||||
public:
|
||||
BytesSpeed() = default;
|
||||
~BytesSpeed() = default;
|
||||
|
||||
/**
|
||||
* 添加统计字节
|
||||
*/
|
||||
BytesSpeed& operator += (uint64_t bytes) {
|
||||
_bytes += bytes;
|
||||
if (_bytes > 1024 * 1024) {
|
||||
//数据大于1MB就计算一次网速
|
||||
computeSpeed();
|
||||
}
|
||||
return *this;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取速度,单位bytes/s
|
||||
*/
|
||||
int getSpeed() {
|
||||
if (_ticker.elapsedTime() < 1000) {
|
||||
//获取频率小于1秒,那么返回上次计算结果
|
||||
return _speed;
|
||||
}
|
||||
return computeSpeed();
|
||||
}
|
||||
|
||||
private:
|
||||
uint64_t computeSpeed() {
|
||||
auto elapsed = _ticker.elapsedTime();
|
||||
if (!elapsed) {
|
||||
return _speed;
|
||||
}
|
||||
_speed = _bytes * 1000 / elapsed;
|
||||
_ticker.resetTime();
|
||||
_bytes = 0;
|
||||
return _speed;
|
||||
}
|
||||
|
||||
private:
|
||||
int _speed = 0;
|
||||
uint64_t _bytes = 0;
|
||||
Ticker _ticker;
|
||||
};
|
||||
|
||||
/**
|
||||
* 媒体源,任何rtsp/rtmp的直播流都源自该对象
|
||||
*/
|
||||
@@ -170,6 +216,13 @@ public:
|
||||
// 设置时间戳
|
||||
virtual void setTimeStamp(uint32_t stamp) {};
|
||||
|
||||
// 获取数据速率,单位bytes/s
|
||||
int getBytesSpeed();
|
||||
// 获取流创建GMT unix时间戳,单位秒
|
||||
uint64_t getCreateStamp() const;
|
||||
// 获取流上线时间,单位秒
|
||||
uint64_t getAliveSecond() const;
|
||||
|
||||
////////////////MediaSourceEvent相关接口实现////////////////
|
||||
|
||||
// 设置监听者
|
||||
@@ -229,7 +282,12 @@ private:
|
||||
//触发媒体事件
|
||||
void emitEvent(bool regist);
|
||||
|
||||
protected:
|
||||
BytesSpeed _speed;
|
||||
|
||||
private:
|
||||
time_t _create_stamp;
|
||||
Ticker _ticker;
|
||||
string _schema;
|
||||
string _vhost;
|
||||
string _app;
|
||||
|
||||
@@ -60,6 +60,7 @@ const string kBroadcaseProxyPusherFailed = "kBroadcaseProxyPusherFailed";
|
||||
//通用配置项目
|
||||
namespace General{
|
||||
#define GENERAL_FIELD "general."
|
||||
const string kMediaServerId = GENERAL_FIELD"mediaServerId";
|
||||
const string kFlowThreshold = GENERAL_FIELD"flowThreshold";
|
||||
const string kStreamNoneReaderDelayMS = GENERAL_FIELD"streamNoneReaderDelayMS";
|
||||
const string kMaxStreamWaitTimeMS = GENERAL_FIELD"maxStreamWaitMS";
|
||||
@@ -82,6 +83,7 @@ onceToken token([](){
|
||||
mINI::Instance()[kPublishToMP4] = 0;
|
||||
mINI::Instance()[kMergeWriteMS] = 0;
|
||||
mINI::Instance()[kModifyStamp] = 0;
|
||||
mINI::Instance()[kMediaServerId] = makeRandStr(16);
|
||||
},nullptr);
|
||||
|
||||
}//namespace General
|
||||
|
||||
@@ -162,6 +162,8 @@ extern const string kBroadcastReloadConfig;
|
||||
|
||||
////////////通用配置///////////
|
||||
namespace General{
|
||||
//每个流媒体服务器的ID(GUID)
|
||||
extern const string kMediaServerId;
|
||||
//流量汇报事件流量阈值,单位KB,默认1MB
|
||||
extern const string kFlowThreshold;
|
||||
//流无人观看并且超过若干时间后才触发kBroadcastStreamNoneReader事件
|
||||
|
||||
@@ -99,6 +99,7 @@ public:
|
||||
if (key) {
|
||||
_have_video = true;
|
||||
}
|
||||
_speed += packet->size();
|
||||
PacketCache<FMP4Packet, FMP4FlushPolicy>::inputPacket(true, packet, key);
|
||||
}
|
||||
|
||||
|
||||
@@ -105,6 +105,9 @@ void HlsMakerImp::onWriteSegment(const char *data, int len) {
|
||||
if (_file) {
|
||||
fwrite(data, len, 1, _file.get());
|
||||
}
|
||||
if (_media_src) {
|
||||
_media_src->onSegmentSize(len);
|
||||
}
|
||||
}
|
||||
|
||||
void HlsMakerImp::onWriteHls(const char *data, int len) {
|
||||
|
||||
@@ -79,6 +79,10 @@ public:
|
||||
_list_cb.emplace_back(std::move(cb));
|
||||
}
|
||||
|
||||
void onSegmentSize(uint64_t bytes) {
|
||||
_speed += bytes;
|
||||
}
|
||||
|
||||
private:
|
||||
bool _is_regist = false;
|
||||
RingType::Ptr _ring;
|
||||
|
||||
@@ -32,7 +32,7 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s
|
||||
}
|
||||
//Here we use the customized file path.
|
||||
if (!customized_path.empty()) {
|
||||
m3u8FilePath = customized_path + "/hls.m3u8";
|
||||
return File::absolutePath(m3u8FilePath, customized_path);
|
||||
}
|
||||
return File::absolutePath(m3u8FilePath, hlsPath);
|
||||
}
|
||||
@@ -47,7 +47,7 @@ string Recorder::getRecordPath(Recorder::type type, const string &vhost, const s
|
||||
}
|
||||
//Here we use the customized file path.
|
||||
if (!customized_path.empty()) {
|
||||
mp4FilePath = customized_path + "/";
|
||||
return File::absolutePath(mp4FilePath, customized_path);
|
||||
}
|
||||
return File::absolutePath(mp4FilePath, recordPath);
|
||||
}
|
||||
|
||||
@@ -47,7 +47,7 @@ public:
|
||||
* @param vhost 虚拟主机
|
||||
* @param app 应用名
|
||||
* @param stream_id 流id
|
||||
* @param customized_path 录像文件保存自定义目录,默认为空则自动生成
|
||||
* @param customized_path 录像文件保存自定义根目录,为空则采用配置文件设置
|
||||
* @return 录制文件绝对路径
|
||||
*/
|
||||
static string getRecordPath(type type, const string &vhost, const string &app, const string &stream_id,const string &customized_path = "");
|
||||
@@ -58,7 +58,7 @@ public:
|
||||
* @param vhost 虚拟主机
|
||||
* @param app 应用名
|
||||
* @param stream_id 流id
|
||||
* @param customized_path 录像文件保存自定义目录,默认为空则自动生成
|
||||
* @param customized_path 录像文件保存自定义根目录,为空则采用配置文件设置
|
||||
* @return 对象指针,可能为nullptr
|
||||
*/
|
||||
static std::shared_ptr<MediaSinkInterface> createRecorder(type type, const string &vhost, const string &app, const string &stream_id, const string &customized_path = "");
|
||||
@@ -79,7 +79,7 @@ public:
|
||||
* @param vhost 虚拟主机
|
||||
* @param app 应用名
|
||||
* @param stream_id 流id
|
||||
* @param customized_path 录像文件保存自定义目录,默认为空则自动生成
|
||||
* @param customized_path 录像文件保存自定义根目录,为空则采用配置文件设置
|
||||
* @return 成功与否
|
||||
*/
|
||||
static bool startRecord(type type, const string &vhost, const string &app, const string &stream_id,const string &customized_path);
|
||||
|
||||
@@ -119,6 +119,7 @@ public:
|
||||
* @param pkt rtmp包
|
||||
*/
|
||||
void onWrite(const RtmpPacket::Ptr &pkt, bool = true) override {
|
||||
_speed += pkt->size();
|
||||
//保存当前时间戳
|
||||
switch (pkt->type_id) {
|
||||
case MSG_VIDEO : _track_stamps[TrackVideo] = pkt->time_stamp, _have_video = true; break;
|
||||
|
||||
@@ -11,11 +11,6 @@
|
||||
#include "Common/config.h"
|
||||
#include "RtpReceiver.h"
|
||||
|
||||
#define POP_HEAD(trackidx) \
|
||||
auto it = _rtp_sort_cache_map[trackidx].begin(); \
|
||||
onRtpSorted(it->second, trackidx); \
|
||||
_rtp_sort_cache_map[trackidx].erase(it);
|
||||
|
||||
#define AV_RB16(x) \
|
||||
((((const uint8_t*)(x))[0] << 8) | \
|
||||
((const uint8_t*)(x))[1])
|
||||
@@ -24,7 +19,18 @@
|
||||
|
||||
namespace mediakit {
|
||||
|
||||
RtpReceiver::RtpReceiver() {}
|
||||
RtpReceiver::RtpReceiver() {
|
||||
GET_CONFIG(uint32_t, clearCount, Rtp::kClearCount);
|
||||
GET_CONFIG(uint32_t, maxRtpCount, Rtp::kMaxRtpCount);
|
||||
int index = 0;
|
||||
for (auto &sortor : _rtp_sortor) {
|
||||
sortor.setup(maxRtpCount, clearCount);
|
||||
sortor.setOnSort([this, index](uint16_t seq, const RtpPacket::Ptr &packet) {
|
||||
onRtpSorted(packet, index);
|
||||
});
|
||||
++index;
|
||||
}
|
||||
}
|
||||
RtpReceiver::~RtpReceiver() {}
|
||||
|
||||
bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate, unsigned char *rtp_raw_ptr, unsigned int rtp_raw_len) {
|
||||
@@ -80,7 +86,7 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
|
||||
if (_ssrc_err_count[track_index]++ > 10) {
|
||||
//ssrc切换后清除老数据
|
||||
WarnL << "ssrc更换:" << _ssrc[track_index] << " -> " << rtp.ssrc;
|
||||
_rtp_sort_cache_map[track_index].clear();
|
||||
_rtp_sortor[track_index].clear();
|
||||
_ssrc[track_index] = rtp.ssrc;
|
||||
}
|
||||
return false;
|
||||
@@ -127,56 +133,15 @@ bool RtpReceiver::handleOneRtp(int track_index, TrackType type, int samplerate,
|
||||
}
|
||||
|
||||
void RtpReceiver::sortRtp(const RtpPacket::Ptr &rtp,int track_index){
|
||||
if(rtp->sequence != _last_seq[track_index] + 1 && _last_seq[track_index] != 0){
|
||||
//包乱序或丢包
|
||||
_seq_ok_count[track_index] = 0;
|
||||
_sort_started[track_index] = true;
|
||||
if(_last_seq[track_index] > rtp->sequence && _last_seq[track_index] - rtp->sequence > 0xFF){
|
||||
//sequence回环,清空所有排序缓存
|
||||
while (_rtp_sort_cache_map[track_index].size()) {
|
||||
POP_HEAD(track_index)
|
||||
}
|
||||
++_seq_cycle_count[track_index];
|
||||
}
|
||||
}else{
|
||||
//正确序列的包
|
||||
_seq_ok_count[track_index]++;
|
||||
}
|
||||
|
||||
_last_seq[track_index] = rtp->sequence;
|
||||
|
||||
//开始排序缓存
|
||||
if (_sort_started[track_index]) {
|
||||
_rtp_sort_cache_map[track_index].emplace(rtp->sequence, rtp);
|
||||
GET_CONFIG(uint32_t,clearCount,Rtp::kClearCount);
|
||||
GET_CONFIG(uint32_t,maxRtpCount,Rtp::kMaxRtpCount);
|
||||
if (_seq_ok_count[track_index] >= clearCount) {
|
||||
//网络环境改善,需要清空排序缓存
|
||||
_seq_ok_count[track_index] = 0;
|
||||
_sort_started[track_index] = false;
|
||||
while (_rtp_sort_cache_map[track_index].size()) {
|
||||
POP_HEAD(track_index)
|
||||
}
|
||||
} else if (_rtp_sort_cache_map[track_index].size() >= maxRtpCount) {
|
||||
//排序缓存溢出
|
||||
POP_HEAD(track_index)
|
||||
}
|
||||
}else{
|
||||
//正确序列
|
||||
onRtpSorted(rtp, track_index);
|
||||
}
|
||||
_rtp_sortor[track_index].sortPacket(rtp->sequence, rtp);
|
||||
}
|
||||
|
||||
void RtpReceiver::clear() {
|
||||
CLEAR_ARR(_last_seq);
|
||||
CLEAR_ARR(_ssrc);
|
||||
CLEAR_ARR(_ssrc_err_count);
|
||||
CLEAR_ARR(_seq_ok_count);
|
||||
CLEAR_ARR(_sort_started);
|
||||
CLEAR_ARR(_seq_cycle_count);
|
||||
|
||||
_rtp_sort_cache_map[0].clear();
|
||||
_rtp_sort_cache_map[1].clear();
|
||||
for (auto &sortor : _rtp_sortor) {
|
||||
sortor.clear();
|
||||
}
|
||||
}
|
||||
|
||||
void RtpReceiver::setPoolSize(int size) {
|
||||
@@ -184,11 +149,11 @@ void RtpReceiver::setPoolSize(int size) {
|
||||
}
|
||||
|
||||
int RtpReceiver::getJitterSize(int track_index){
|
||||
return _rtp_sort_cache_map[track_index].size();
|
||||
return _rtp_sortor[track_index].getJitterSize();
|
||||
}
|
||||
|
||||
int RtpReceiver::getCycleCount(int track_index){
|
||||
return _seq_cycle_count[track_index];
|
||||
return _rtp_sortor[track_index].getCycleCount();
|
||||
}
|
||||
|
||||
|
||||
|
||||
@@ -11,24 +11,141 @@
|
||||
#ifndef ZLMEDIAKIT_RTPRECEIVER_H
|
||||
#define ZLMEDIAKIT_RTPRECEIVER_H
|
||||
|
||||
|
||||
#include <map>
|
||||
#include <string>
|
||||
#include <memory>
|
||||
#include "RtpCodec.h"
|
||||
#include "RtspMediaSource.h"
|
||||
|
||||
using namespace std;
|
||||
using namespace toolkit;
|
||||
|
||||
namespace mediakit {
|
||||
|
||||
template<typename T, typename SEQ = uint16_t>
|
||||
class PacketSortor {
|
||||
public:
|
||||
PacketSortor() = default;
|
||||
~PacketSortor() = default;
|
||||
|
||||
/**
|
||||
* 设置参数
|
||||
* @param max_sort_size 最大排序缓存长度
|
||||
* @param clear_sort_size seq连续次数超过该值后,清空并关闭排序缓存
|
||||
*/
|
||||
void setup(uint32_t max_sort_size, uint32_t clear_sort_size) {
|
||||
_max_sort_size = max_sort_size;
|
||||
_clear_sort_size = clear_sort_size;
|
||||
}
|
||||
|
||||
void setOnSort(function<void(SEQ seq, const T &packet)> cb){
|
||||
_cb = std::move(cb);
|
||||
}
|
||||
|
||||
/**
|
||||
* 清空状态
|
||||
*/
|
||||
void clear() {
|
||||
_last_seq = 0;
|
||||
_seq_ok_count = 0;
|
||||
_sort_started = 0;
|
||||
_seq_cycle_count = 0;
|
||||
_rtp_sort_cache_map.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取排序缓存长度
|
||||
*/
|
||||
int getJitterSize(){
|
||||
return _rtp_sort_cache_map.size();
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取seq回环次数
|
||||
*/
|
||||
int getCycleCount(){
|
||||
return _seq_cycle_count;
|
||||
}
|
||||
|
||||
/**
|
||||
* 输入并排序
|
||||
* @param seq 序列号
|
||||
* @param packet 包负载
|
||||
*/
|
||||
void sortPacket(SEQ seq, const T &packet){
|
||||
if (seq != _last_seq + 1 && _last_seq != 0) {
|
||||
//包乱序或丢包
|
||||
_seq_ok_count = 0;
|
||||
_sort_started = true;
|
||||
if (_last_seq > seq && _last_seq - seq > 0xFF) {
|
||||
//sequence回环,清空所有排序缓存
|
||||
while (_rtp_sort_cache_map.size()) {
|
||||
popPacket();
|
||||
}
|
||||
++_seq_cycle_count;
|
||||
}
|
||||
} else {
|
||||
//正确序列的包
|
||||
_seq_ok_count++;
|
||||
}
|
||||
|
||||
_last_seq = seq;
|
||||
|
||||
//开始排序缓存
|
||||
if (_sort_started) {
|
||||
_rtp_sort_cache_map.emplace(seq, packet);
|
||||
if (_seq_ok_count >= _clear_sort_size) {
|
||||
//网络环境改善,需要清空排序缓存
|
||||
_seq_ok_count = 0;
|
||||
_sort_started = false;
|
||||
while (_rtp_sort_cache_map.size()) {
|
||||
popPacket();
|
||||
}
|
||||
} else if (_rtp_sort_cache_map.size() >= _max_sort_size) {
|
||||
//排序缓存溢出
|
||||
popPacket();
|
||||
}
|
||||
} else {
|
||||
//正确序列
|
||||
onPacketSorted(seq, packet);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
void popPacket() {
|
||||
auto it = _rtp_sort_cache_map.begin();
|
||||
onPacketSorted(it->first, it->second);
|
||||
_rtp_sort_cache_map.erase(it);
|
||||
}
|
||||
|
||||
void onPacketSorted(SEQ seq, const T &packet) {
|
||||
_cb(seq, packet);
|
||||
}
|
||||
|
||||
private:
|
||||
//是否开始seq排序
|
||||
bool _sort_started = false;
|
||||
//上次seq
|
||||
SEQ _last_seq = 0;
|
||||
//seq连续次数计数
|
||||
uint32_t _seq_ok_count = 0;
|
||||
//seq回环次数计数
|
||||
uint32_t _seq_cycle_count = 0;
|
||||
//排序缓存长度
|
||||
uint32_t _max_sort_size;
|
||||
//seq连续次数超过该值后,清空并关闭排序缓存
|
||||
uint32_t _clear_sort_size;
|
||||
//rtp排序缓存,根据seq排序
|
||||
map<SEQ, T> _rtp_sort_cache_map;
|
||||
//回调
|
||||
function<void(SEQ seq, const T &packet)> _cb;
|
||||
};
|
||||
|
||||
class RtpReceiver {
|
||||
public:
|
||||
RtpReceiver();
|
||||
virtual ~RtpReceiver();
|
||||
protected:
|
||||
|
||||
protected:
|
||||
/**
|
||||
* 输入数据指针生成并排序rtp包
|
||||
* @param track_index track下标索引
|
||||
@@ -46,6 +163,7 @@ protected:
|
||||
* @param track_index track索引
|
||||
*/
|
||||
virtual void onRtpSorted(const RtpPacket::Ptr &rtp, int track_index){}
|
||||
|
||||
void clear();
|
||||
void setPoolSize(int size);
|
||||
int getJitterSize(int track_index);
|
||||
@@ -58,16 +176,8 @@ private:
|
||||
uint32_t _ssrc[2] = { 0, 0 };
|
||||
//ssrc不匹配计数
|
||||
uint32_t _ssrc_err_count[2] = { 0, 0 };
|
||||
//上次seq
|
||||
uint16_t _last_seq[2] = { 0 , 0 };
|
||||
//seq连续次数计数
|
||||
uint32_t _seq_ok_count[2] = { 0 , 0};
|
||||
//seq回环次数计数
|
||||
uint32_t _seq_cycle_count[2] = { 0 , 0};
|
||||
//是否开始seq排序
|
||||
bool _sort_started[2] = { 0 , 0};
|
||||
//rtp排序缓存,根据seq排序
|
||||
map<uint16_t , RtpPacket::Ptr> _rtp_sort_cache_map[2];
|
||||
PacketSortor<RtpPacket::Ptr> _rtp_sortor[2];
|
||||
//rtp循环池
|
||||
RtspMediaSource::PoolType _rtp_pool;
|
||||
};
|
||||
|
||||
@@ -157,6 +157,7 @@ public:
|
||||
* @param keyPos 该包是否为关键帧的第一个包
|
||||
*/
|
||||
void onWrite(const RtpPacket::Ptr &rtp, bool keyPos) override {
|
||||
_speed += rtp->size();
|
||||
assert(rtp->type >= 0 && rtp->type < TrackMax);
|
||||
auto track = _tracks[rtp->type];
|
||||
if (track) {
|
||||
|
||||
@@ -76,6 +76,7 @@ public:
|
||||
* @param key 是否为关键帧第一个包
|
||||
*/
|
||||
void onWrite(const TSPacket::Ptr &packet, bool key) override {
|
||||
_speed += packet->size();
|
||||
if (!_ring) {
|
||||
createRing();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user