大幅提升接收推流性能以及降低内存占用

This commit is contained in:
xiongziliang
2020-11-01 03:41:35 +08:00
parent 5c7a08eb7c
commit 700a16c759
38 changed files with 173 additions and 193 deletions

View File

@@ -136,7 +136,7 @@ public:
uint32_t ts_field = 0;
uint32_t stream_index;
uint32_t chunk_id;
std::string buffer;
BufferLikeString buffer;
public:
char *data() const override{

View File

@@ -58,8 +58,8 @@ protected:
//from RtmpProtocol
void onRtmpChunk(RtmpPacket &chunk_data) override;
void onStreamDry(uint32_t stream_index) override;
void onSendRawData(const Buffer::Ptr &buffer) override {
send(buffer);
void onSendRawData(Buffer::Ptr buffer) override {
send(std::move(buffer));
}
template<typename FUNC>

View File

@@ -57,8 +57,8 @@ static string openssl_HMACsha256(const void *key, unsigned int key_len, const vo
namespace mediakit {
RtmpProtocol::RtmpProtocol() {
_next_step_func = [this]() {
handle_C0C1();
_next_step_func = [this](const char *data, uint64_t len) {
return handle_C0C1(data, len);
};
}
@@ -84,10 +84,10 @@ void RtmpProtocol::reset() {
//////////Invoke Request//////////
_send_req_id = 0;
//////////Rtmp parser//////////
_recv_data_buf.clear();
HttpRequestSplitter::reset();
_stream_index = STREAM_CONTROL;
_next_step_func = [this]() {
handle_C0C1();
_next_step_func = [this](const char *data, uint64_t len) {
return handle_C0C1(data, len);
};
}
@@ -218,7 +218,7 @@ void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::P
set_be24(header->body_size, buf->size());
set_le32(header->stream_index, stream_index);
//发送rtmp头
onSendRawData(buffer_header);
onSendRawData(std::move(buffer_header));
//扩展时间戳字段
BufferRaw::Ptr buffer_ext_stamp;
@@ -260,17 +260,20 @@ void RtmpProtocol::sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::P
}
}
void RtmpProtocol::onParseRtmp(const char *data, int size) {
_recv_data_buf.append(data, size);
//移动拷贝提高性能
function<void()> next_step_func(std::move(_next_step_func));
//执行下一步
next_step_func();
void RtmpProtocol::onParseRtmp(const char *data, uint64_t size) {
input(data, size);
}
const char *RtmpProtocol::onSearchPacketTail(const char *data,uint64_t len){
//移动拷贝提高性能
auto next_step_func(std::move(_next_step_func));
//执行下一步
auto ret = next_step_func(data, len);
if (!_next_step_func) {
//为设置下一步,恢复之
next_step_func.swap(_next_step_func);
}
return ret;
}
////for client////
@@ -280,57 +283,57 @@ void RtmpProtocol::startClientSession(const function<void()> &func) {
onSendRawData(obtainBuffer(&handshake_head, 1));
RtmpHandshake c1(0);
onSendRawData(obtainBuffer((char *) (&c1), sizeof(c1)));
_next_step_func = [this, func]() {
_next_step_func = [this, func](const char *data, uint64_t len) {
//等待 S0+S1+S2
handle_S0S1S2(func);
return handle_S0S1S2(data, len, func);
};
}
void RtmpProtocol::handle_S0S1S2(const function<void()> &func) {
if (_recv_data_buf.size() < 1 + 2 * C1_HANDSHARK_SIZE) {
const char* RtmpProtocol::handle_S0S1S2(const char *data, uint64_t len, const function<void()> &func) {
if (len < 1 + 2 * C1_HANDSHARK_SIZE) {
//数据不够
return;
return nullptr;
}
if (_recv_data_buf[0] != HANDSHAKE_PLAINTEXT) {
if (data[0] != HANDSHAKE_PLAINTEXT) {
throw std::runtime_error("only plaintext[0x03] handshake supported");
}
//发送 C2
const char *pcC2 = _recv_data_buf.data() + 1;
const char *pcC2 = data + 1;
onSendRawData(obtainBuffer(pcC2, C1_HANDSHARK_SIZE));
_recv_data_buf.erase(0, 1 + 2 * C1_HANDSHARK_SIZE);
//握手结束
_next_step_func = [this]() {
_next_step_func = [this](const char *data, uint64_t len) {
//握手结束并且开始进入解析命令模式
handle_rtmp();
return handle_rtmp(data, len);
};
func();
return data + 1 + 2 * C1_HANDSHARK_SIZE;
}
////for server ////
void RtmpProtocol::handle_C0C1() {
if (_recv_data_buf.size() < 1 + C1_HANDSHARK_SIZE) {
const char * RtmpProtocol::handle_C0C1(const char *data, uint64_t len) {
if (len < 1 + C1_HANDSHARK_SIZE) {
//need more data!
return;
return nullptr;
}
if (_recv_data_buf[0] != HANDSHAKE_PLAINTEXT) {
if (data[0] != HANDSHAKE_PLAINTEXT) {
throw std::runtime_error("only plaintext[0x03] handshake supported");
}
if (memcmp(_recv_data_buf.data() + 5, "\x00\x00\x00\x00", 4) == 0) {
if (memcmp(data + 5, "\x00\x00\x00\x00", 4) == 0) {
//simple handsharke
handle_C1_simple();
handle_C1_simple(data);
} else {
#ifdef ENABLE_OPENSSL
//complex handsharke
handle_C1_complex();
handle_C1_complex(data);
#else
WarnL << "未打开ENABLE_OPENSSL宏复杂握手采用简单方式处理flash播放器可能无法播放";
handle_C1_simple();
handle_C1_simple(data);
#endif//ENABLE_OPENSSL
}
_recv_data_buf.erase(0, 1 + C1_HANDSHARK_SIZE);
return data + 1 + C1_HANDSHARK_SIZE;
}
void RtmpProtocol::handle_C1_simple(){
void RtmpProtocol::handle_C1_simple(const char *data){
//发送S0
char handshake_head = HANDSHAKE_PLAINTEXT;
onSendRawData(obtainBuffer(&handshake_head, 1));
@@ -338,18 +341,19 @@ void RtmpProtocol::handle_C1_simple(){
RtmpHandshake s1(0);
onSendRawData(obtainBuffer((char *) &s1, C1_HANDSHARK_SIZE));
//发送S2
onSendRawData(obtainBuffer(_recv_data_buf.data() + 1, C1_HANDSHARK_SIZE));
onSendRawData(obtainBuffer(data + 1, C1_HANDSHARK_SIZE));
//等待C2
_next_step_func = [this]() {
handle_C2();
_next_step_func = [this](const char *data, uint64_t len) {
//握手结束并且开始进入解析命令模式
return handle_C2(data, len);
};
}
#ifdef ENABLE_OPENSSL
void RtmpProtocol::handle_C1_complex(){
void RtmpProtocol::handle_C1_complex(const char *data){
//参考自:http://blog.csdn.net/win_lin/article/details/13006803
//skip c0,time,version
const char *c1_start = _recv_data_buf.data() + 1;
const char *c1_start = data + 1;
const char *schema_start = c1_start + 8;
char *digest_start;
try {
@@ -385,7 +389,7 @@ void RtmpProtocol::handle_C1_complex(){
// InfoL << "schema1";
} catch (std::exception &ex) {
// WarnL << "try rtmp complex schema1 failed:" << ex.what();
handle_C1_simple();
handle_C1_simple(data);
}
}
}
@@ -502,44 +506,43 @@ void RtmpProtocol::send_complex_S0S1S2(int schemeType,const string &digest){
memcpy((char *) &s2 + C1_HANDSHARK_SIZE - C1_DIGEST_SIZE, s2_digest.data(), C1_DIGEST_SIZE);
onSendRawData(obtainBuffer((char *) &s2, sizeof(s2)));
//等待C2
_next_step_func = [this]() {
handle_C2();
_next_step_func = [this](const char *data, uint64_t len) {
return handle_C2(data, len);
};
}
#endif //ENABLE_OPENSSL
void RtmpProtocol::handle_C2() {
if (_recv_data_buf.size() < C1_HANDSHARK_SIZE) {
const char* RtmpProtocol::handle_C2(const char *data, uint64_t len) {
if (len < C1_HANDSHARK_SIZE) {
//need more data!
return;
return nullptr;
}
_recv_data_buf.erase(0, C1_HANDSHARK_SIZE);
//握手结束,进入命令模式
if (!_recv_data_buf.empty()) {
handle_rtmp();
}
_next_step_func = [this]() {
handle_rtmp();
_next_step_func = [this](const char *data, uint64_t len) {
return handle_rtmp(data, len);
};
//握手结束,进入命令模式
return handle_rtmp(data + C1_HANDSHARK_SIZE, len - C1_HANDSHARK_SIZE);
}
static const size_t HEADER_LENGTH[] = {12, 8, 4, 1};
void RtmpProtocol::handle_rtmp() {
while (!_recv_data_buf.empty()) {
const char* RtmpProtocol::handle_rtmp(const char *data, uint64_t len) {
auto ptr = data;
while (len) {
int offset = 0;
uint8_t flags = _recv_data_buf[0];
uint8_t flags = ptr[0];
size_t header_len = HEADER_LENGTH[flags >> 6];
_now_chunk_id = flags & 0x3f;
switch (_now_chunk_id) {
case 0: {
//0 值表示二字节形式,并且 ID 范围 64 - 319
//(第二个字节 + 64)。
if (_recv_data_buf.size() < 2) {
if (len < 2) {
//need more data
return;
return ptr;
}
_now_chunk_id = 64 + (uint8_t) (_recv_data_buf[1]);
_now_chunk_id = 64 + (uint8_t) (ptr[1]);
offset = 1;
break;
}
@@ -547,11 +550,11 @@ void RtmpProtocol::handle_rtmp() {
case 1: {
//1 值表示三字节形式,并且 ID 范围为 64 - 65599
//((第三个字节) * 256 + 第二个字节 + 64)。
if (_recv_data_buf.size() < 3) {
if (len < 3) {
//need more data
return;
return ptr;
}
_now_chunk_id = 64 + ((uint8_t) (_recv_data_buf[2]) << 8) + (uint8_t) (_recv_data_buf[1]);
_now_chunk_id = 64 + ((uint8_t) (ptr[2]) << 8) + (uint8_t) (ptr[1]);
offset = 2;
break;
}
@@ -560,12 +563,12 @@ void RtmpProtocol::handle_rtmp() {
default : break;
}
if (_recv_data_buf.size() < header_len + offset) {
if (len < header_len + offset) {
//need more data
return;
return ptr;
}
RtmpHeader &header = *((RtmpHeader *) (_recv_data_buf.data() + offset));
RtmpHeader &header = *((RtmpHeader *) (ptr + offset));
auto &chunk_data = _map_chunk_data[_now_chunk_id];
chunk_data.chunk_id = _now_chunk_id;
switch (header_len) {
@@ -581,11 +584,11 @@ void RtmpProtocol::handle_rtmp() {
auto time_stamp = chunk_data.ts_field;
if (chunk_data.ts_field == 0xFFFFFF) {
if (_recv_data_buf.size() < header_len + offset + 4) {
if (len < header_len + offset + 4) {
//need more data
return;
return ptr;
}
time_stamp = load_be32(_recv_data_buf.data() + offset + header_len);
time_stamp = load_be32(ptr + offset + header_len);
offset += 4;
}
@@ -593,27 +596,29 @@ void RtmpProtocol::handle_rtmp() {
throw std::runtime_error("非法的bodySize");
}
auto iMore = min(_chunk_size_in, chunk_data.body_size - chunk_data.buffer.size());
if (_recv_data_buf.size() < header_len + offset + iMore) {
auto more = min(_chunk_size_in, (size_t)(chunk_data.body_size - chunk_data.buffer.size()));
if (len < header_len + offset + more) {
//need more data
return;
return ptr;
}
chunk_data.buffer.append(_recv_data_buf, header_len + offset, iMore);
_recv_data_buf.erase(0, header_len + offset + iMore);
chunk_data.buffer.append(ptr + header_len + offset, more);
ptr += header_len + offset + more;
len -= header_len + offset + more;
if (chunk_data.buffer.size() == chunk_data.body_size) {
//frame is ready
_now_stream_index = chunk_data.stream_index;
chunk_data.time_stamp = time_stamp + (chunk_data.is_abs_stamp ? 0 : chunk_data.time_stamp);
if (chunk_data.body_size) {
handle_rtmpChunk(chunk_data);
handle_chunk(chunk_data);
}
chunk_data.buffer.clear();
chunk_data.is_abs_stamp = false;
}
}
return ptr;
}
void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunk_data) {
void RtmpProtocol::handle_chunk(RtmpPacket& chunk_data) {
switch (chunk_data.type_id) {
case MSG_ACK: {
if (chunk_data.buffer.size() < 4) {
@@ -713,7 +718,7 @@ void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunk_data) {
case MSG_AGGREGATE: {
auto ptr = (uint8_t *) chunk_data.buffer.data();
auto ptr_tail = ptr + chunk_data.buffer.length();
auto ptr_tail = ptr + chunk_data.buffer.size();
while (ptr + 8 + 3 < ptr_tail) {
auto type = *ptr;
ptr += 1;
@@ -730,14 +735,13 @@ void RtmpProtocol::handle_rtmpChunk(RtmpPacket& chunk_data) {
break;
}
RtmpPacket sub_packet;
sub_packet.buffer.resize(size);
memcpy((char *) sub_packet.buffer.data(), ptr, size);
sub_packet.buffer.assign((char *)ptr, size);
sub_packet.type_id = type;
sub_packet.body_size = size;
sub_packet.time_stamp = ts;
sub_packet.stream_index = chunk_data.stream_index;
sub_packet.chunk_id = chunk_data.chunk_id;
handle_rtmpChunk(sub_packet);
handle_chunk(sub_packet);
ptr += size;
}
break;

View File

@@ -21,23 +21,24 @@
#include "Util/TimeTicker.h"
#include "Network/Socket.h"
#include "Util/ResourcePool.h"
#include "Http/HttpRequestSplitter.h"
using namespace std;
using namespace toolkit;
namespace mediakit {
class RtmpProtocol {
class RtmpProtocol : public HttpRequestSplitter{
public:
RtmpProtocol();
virtual ~RtmpProtocol();
void onParseRtmp(const char *data, int size);
void onParseRtmp(const char *data, uint64_t size);
//作为客户端发送c0c1等待s0s1s2并且回调
void startClientSession(const function<void()> &cb);
protected:
virtual void onSendRawData(const Buffer::Ptr &buffer) = 0;
virtual void onSendRawData(Buffer::Ptr buffer) = 0;
virtual void onRtmpChunk(RtmpPacket &chunk_data) = 0;
virtual void onStreamBegin(uint32_t stream_index){
_stream_index = stream_index;
@@ -45,6 +46,11 @@ protected:
virtual void onStreamEof(uint32_t stream_index){};
virtual void onStreamDry(uint32_t stream_index){};
protected:
//// HttpRequestSplitter override ////
int64_t onRecvHeader(const char *data,uint64_t len) override { return 0; }
const char *onSearchPacketTail(const char *data,uint64_t len) override;
protected:
void reset();
BufferRaw::Ptr obtainBuffer();
@@ -66,20 +72,20 @@ protected:
void sendRtmp(uint8_t type, uint32_t stream_index, const Buffer::Ptr &buffer, uint32_t stamp, int chunk_id);
private:
void handle_S0S1S2(const function<void()> &func);
void handle_C0C1();
void handle_C1_simple();
void handle_C1_simple(const char *data);
#ifdef ENABLE_OPENSSL
void handle_C1_complex();
void handle_C1_complex(const char *data);
string get_C1_digest(const uint8_t *ptr,char **digestPos);
string get_C1_key(const uint8_t *ptr);
void check_C1_Digest(const string &digest,const string &data);
void send_complex_S0S1S2(int schemeType,const string &digest);
#endif //ENABLE_OPENSSL
void handle_C2();
void handle_rtmp();
void handle_rtmpChunk(RtmpPacket &chunk_data);
const char* handle_S0S1S2(const char *data, uint64_t len, const function<void()> &func);
const char* handle_C0C1(const char *data, uint64_t len);
const char* handle_C2(const char *data, uint64_t len);
const char* handle_rtmp(const char *data, uint64_t len);
void handle_chunk(RtmpPacket &chunk_data);
protected:
int _send_req_id = 0;
@@ -100,8 +106,7 @@ private:
uint32_t _bandwidth = 2500000;
uint8_t _band_limit_type = 2;
//////////Rtmp parser//////////
string _recv_data_buf;
function<void()> _next_step_func;
function<const char * (const char *data, uint64_t len)> _next_step_func;
////////////Chunk////////////
unordered_map<int, RtmpPacket> _map_chunk_data;
};

View File

@@ -119,9 +119,6 @@ void RtmpPusher::onConnect(const SockException &err){
onPublishResult(err, false);
return;
}
//推流器不需要多大的接收缓存,节省内存占用
getSock()->setReadBuffer(std::make_shared<BufferRaw>(1 * 1024));
weak_ptr<RtmpPusher> weak_self = dynamic_pointer_cast<RtmpPusher>(shared_from_this());
startClientSession([weak_self]() {
auto strong_self = weak_self.lock();

View File

@@ -43,8 +43,8 @@ protected:
//for RtmpProtocol override
void onRtmpChunk(RtmpPacket &chunk_data) override;
void onSendRawData(const Buffer::Ptr &buffer) override{
send(buffer);
void onSendRawData(Buffer::Ptr buffer) override{
send(std::move(buffer));
}
private:

View File

@@ -17,8 +17,6 @@ RtmpSession::RtmpSession(const Socket::Ptr &sock) : TcpSession(sock) {
DebugP(this);
GET_CONFIG(uint32_t,keep_alive_sec,Rtmp::kKeepAliveSecond);
sock->setSendTimeOutSecond(keep_alive_sec);
//起始接收buffer缓存设置为4K节省内存
sock->setReadBuffer(std::make_shared<BufferRaw>(4 * 1024));
}
RtmpSession::~RtmpSession() {
@@ -151,9 +149,6 @@ void RtmpSession::onCmd_publish(AMFDecoder &dec) {
_publisher_src->setListener(dynamic_pointer_cast<MediaSourceEvent>(shared_from_this()));
//设置转协议
_publisher_src->setProtocolTranslation(enableHls, enableMP4);
//如果是rtmp推流客户端那么加大TCP接收缓存这样能提升接收性能
getSock()->setReadBuffer(std::make_shared<BufferRaw>(256 * 1024));
setSocketFlags();
};

View File

@@ -56,9 +56,9 @@ private:
void setMetaData(AMFDecoder &dec);
void onSendMedia(const RtmpPacket::Ptr &pkt);
void onSendRawData(const Buffer::Ptr &buffer) override{
void onSendRawData(Buffer::Ptr buffer) override{
_total_bytes += buffer->size();
send(buffer);
send(std::move(buffer));
}
void onRtmpChunk(RtmpPacket &chunk_data) override;

View File

@@ -539,7 +539,7 @@ std::string AMFDecoder::load<std::string>() {
if (pos + str_len > buf.size()) {
throw std::runtime_error("Not enough data");
}
std::string s(buf, pos, str_len);
std::string s = buf.substr(pos, str_len);
pos += str_len;
return s;
}
@@ -612,7 +612,7 @@ std::string AMFDecoder::load_key() {
if (pos + str_len > buf.size()) {
throw std::runtime_error("Not enough data");
}
std::string s(buf, pos, str_len);
std::string s = buf.substr(pos, str_len);
pos += str_len;
return s;
@@ -680,7 +680,7 @@ AMFValue AMFDecoder::load_arr() {
return object;
}
AMFDecoder::AMFDecoder(const std::string &buf_in, size_t pos_in, int version_in) :
AMFDecoder::AMFDecoder(const BufferLikeString &buf_in, size_t pos_in, int version_in) :
buf(buf_in), pos(pos_in), version(version_in) {
}

View File

@@ -18,7 +18,9 @@
#include <map>
#include <stdexcept>
#include <functional>
#include "Network/Buffer.h"
using namespace std;
using namespace toolkit;
enum AMFType {
AMF_NUMBER,
@@ -81,7 +83,7 @@ private:
class AMFDecoder {
public:
AMFDecoder(const std::string &buf, size_t pos, int version = 0);
AMFDecoder(const BufferLikeString &buf, size_t pos, int version = 0);
template<typename TP>
TP load();
private:
@@ -92,7 +94,7 @@ private:
uint8_t front();
uint8_t pop_front();
private:
const std::string &buf;
const BufferLikeString &buf;
size_t pos;
int version;
};