新增平滑发送逻辑 (#3072)

This commit is contained in:
夏楚
2023-12-02 10:20:06 +08:00
committed by GitHub
parent 00e6ca3f79
commit 21c03f772f
7 changed files with 117 additions and 1 deletions

View File

@@ -32,6 +32,87 @@ public:
};
} // namespace
class FramePacedSender : public FrameWriterInterface, public std::enable_shared_from_this<FramePacedSender> {
public:
using OnFrame = std::function<void(const Frame::Ptr &frame)>;
// 最小缓存100ms数据
static constexpr auto kMinCacheMS = 100;
FramePacedSender(uint32_t paced_sender_ms, OnFrame cb) {
_paced_sender_ms = paced_sender_ms;
_cb = std::move(cb);
}
void resetTimer(const EventPoller::Ptr &poller) {
std::weak_ptr<FramePacedSender> weak_self = shared_from_this();
_timer = std::make_shared<Timer>(_paced_sender_ms / 1000.0f, [weak_self]() {
if (auto strong_self = weak_self.lock()) {
strong_self->onTick();
return true;
}
return false;
}, poller);
}
bool inputFrame(const Frame::Ptr &frame) override {
if (!_timer) {
setCurrentStamp(frame->dts());
resetTimer(EventPoller::getCurrentPoller());
}
_cache.emplace_back(frame->dts() + _cache_ms, Frame::getCacheAbleFrame(frame));
return true;
}
private:
void onTick() {
auto dst = _cache.empty() ? 0 : _cache.back().first;
while (!_cache.empty()) {
auto &front = _cache.front();
if (getCurrentStamp() < front.first) {
// 还没到消费时间
break;
}
// 时间到了该消费frame了
_cb(front.second);
_cache.pop_front();
}
if (_cache.empty() && dst) {
// 消费太快,需要增加缓存大小
setCurrentStamp(dst);
_cache_ms += kMinCacheMS;
}
// 消费太慢需要强制flush数据
if (_cache.size() > 25 * 5) {
WarnL << "Flush frame paced sender cache: " << _cache.size();
while (!_cache.empty()) {
auto &front = _cache.front();
_cb(front.second);
_cache.pop_front();
}
setCurrentStamp(dst);
}
}
uint64_t getCurrentStamp() { return _ticker.elapsedTime() + _stamp_offset; }
void setCurrentStamp(uint64_t stamp) {
_stamp_offset = stamp;
_ticker.resetTime();
}
private:
uint32_t _paced_sender_ms;
uint32_t _cache_ms = kMinCacheMS;
uint64_t _stamp_offset = 0;
OnFrame _cb;
Ticker _ticker;
Timer::Ptr _timer;
std::list<std::pair<uint64_t, Frame::Ptr>> _cache;
};
static std::shared_ptr<MediaSinkInterface> makeRecorder(MediaSource &sender, const vector<Track::Ptr> &tracks, Recorder::type type, const ProtocolOption &option){
auto recorder = Recorder::createRecorder(type, sender.getMediaTuple(), option);
for (auto &track : tracks) {
@@ -367,6 +448,9 @@ EventPoller::Ptr MultiMediaSourceMuxer::getOwnerPoller(MediaSource &sender) {
if (ret != _poller) {
WarnL << "OwnerPoller changed " << _poller->getThreadName() << " -> " << ret->getThreadName() << " : " << shortUrl();
_poller = ret;
if (_paced_sender) {
_paced_sender->resetTimer(_poller);
}
}
return ret;
} catch (MediaSourceEvent::NotImplemented &) {
@@ -407,6 +491,16 @@ bool MultiMediaSourceMuxer::onTrackReady(const Track::Ptr &track) {
void MultiMediaSourceMuxer::onAllTrackReady() {
CHECK(!_create_in_poller || getOwnerPoller(MediaSource::NullMediaSource())->isCurrentThread());
if (_option.paced_sender_ms) {
std::weak_ptr<MultiMediaSourceMuxer> weak_self = shared_from_this();
_paced_sender = std::make_shared<FramePacedSender>(_option.paced_sender_ms, [weak_self](const Frame::Ptr &frame) {
if (auto strong_self = weak_self.lock()) {
strong_self->onTrackFrame_l(frame);
}
});
}
setMediaListener(getDelegate());
if (_rtmp) {
@@ -492,7 +586,11 @@ void MultiMediaSourceMuxer::resetTracks() {
}
}
bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame_in) {
bool MultiMediaSourceMuxer::onTrackFrame(const Frame::Ptr &frame) {
return _paced_sender ? _paced_sender->inputFrame(frame) : onTrackFrame_l(frame);
}
bool MultiMediaSourceMuxer::onTrackFrame_l(const Frame::Ptr &frame_in) {
auto frame = frame_in;
if (_option.modify_stamp != ProtocolOption::kModifyStampOff) {
// 时间戳不采用原始的绝对时间戳