mirror of
https://github.com/ZLMediaKit/ZLMediaKit.git
synced 2026-06-20 07:12:21 +08:00
for srt push fix ack paramter error result in pkt lost
This commit is contained in:
@@ -15,7 +15,10 @@ static std::atomic<uint32_t> s_srt_socket_id_generate{125};
|
||||
SrtTransport::SrtTransport(const EventPoller::Ptr &poller)
|
||||
: _poller(poller) {
|
||||
_start_timestamp = SteadyClock::now();
|
||||
_socket_id = s_srt_socket_id_generate.fetch_add(1);
|
||||
_socket_id = s_srt_socket_id_generate.fetch_add(1);\
|
||||
_pkt_recv_rate_context = std::make_shared<PacketRecvRateContext>(_start_timestamp);
|
||||
_recv_rate_context = std::make_shared<RecvRateContext>(_start_timestamp);
|
||||
_estimated_link_capacity_context = std::make_shared<EstimatedLinkCapacityContext>(_start_timestamp);
|
||||
}
|
||||
|
||||
SrtTransport::~SrtTransport(){
|
||||
@@ -65,14 +68,14 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
|
||||
s_control_functions.emplace(ControlPacket::PEERERROR, &SrtTransport::handlePeerError);
|
||||
s_control_functions.emplace(ControlPacket::USERDEFINEDTYPE, &SrtTransport::handleUserDefinedType);
|
||||
});
|
||||
auto now = SteadyClock::now();
|
||||
_now = SteadyClock::now();
|
||||
// 处理srt数据
|
||||
if (DataPacket::isDataPacket(buf, len)) {
|
||||
uint32_t socketId = DataPacket::getSocketID(buf,len);
|
||||
if(socketId == _socket_id){
|
||||
_pkt_recv_rate_context.inputPacket(now);
|
||||
_estimated_link_capacity_context.inputPacket(now);
|
||||
_recv_rate_context.inputPacket(now, len);
|
||||
_pkt_recv_rate_context->inputPacket(_now);
|
||||
_estimated_link_capacity_context->inputPacket(_now);
|
||||
_recv_rate_context->inputPacket(_now, len);
|
||||
|
||||
handleDataPacket(buf, len, addr);
|
||||
}else{
|
||||
@@ -87,9 +90,9 @@ void SrtTransport::inputSockData(uint8_t *buf, int len, struct sockaddr_storage
|
||||
switchToOtherTransport(buf,len,socketId,addr);
|
||||
return;
|
||||
}
|
||||
_pkt_recv_rate_context.inputPacket(now);
|
||||
_estimated_link_capacity_context.inputPacket(now);
|
||||
_recv_rate_context.inputPacket(now, len);
|
||||
_pkt_recv_rate_context->inputPacket(_now);
|
||||
_estimated_link_capacity_context->inputPacket(_now);
|
||||
_recv_rate_context->inputPacket(_now, len);
|
||||
|
||||
auto it = s_control_functions.find(type);
|
||||
if (it == s_control_functions.end()) {
|
||||
@@ -165,7 +168,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
|
||||
TraceL << getIdentifier() << " CONCLUSION Phase ";
|
||||
HandshakePacket::Ptr res = std::make_shared<HandshakePacket>();
|
||||
res->dst_socket_id = _peer_socket_id;
|
||||
res->timestamp = DurationCountMicroseconds(SteadyClock::now() - _start_timestamp);
|
||||
res->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
|
||||
res->mtu = _mtu;
|
||||
res->max_flow_window_size = _max_window_size;
|
||||
res->initial_packet_sequence_number = _init_seq_number;
|
||||
@@ -194,6 +197,7 @@ void SrtTransport::handleHandshakeConclusion(HandshakePacket &pkt, struct sockad
|
||||
TraceL << getIdentifier() << " CONCLUSION handle repeate ";
|
||||
sendControlPacket(_handleshake_res, true);
|
||||
}
|
||||
_last_ack_pkt_seq_num = _init_seq_number;
|
||||
}
|
||||
void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
||||
HandshakePacket pkt;
|
||||
@@ -206,21 +210,29 @@ void SrtTransport::handleHandshake(uint8_t *buf, int len, struct sockaddr_storag
|
||||
}else{
|
||||
WarnL<<" not support handshake type = "<< pkt.handshake_type;
|
||||
}
|
||||
_ack_ticker.resetTime();
|
||||
_nak_ticker.resetTime();
|
||||
_ack_ticker.resetTime(_now);
|
||||
_nak_ticker.resetTime(_now);
|
||||
}
|
||||
void SrtTransport::handleKeeplive(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
||||
TraceL;
|
||||
sendKeepLivePacket();
|
||||
}
|
||||
|
||||
void SrtTransport::sendKeepLivePacket(){
|
||||
KeepLivePacket::Ptr pkt = std::make_shared<KeepLivePacket>();
|
||||
pkt->dst_socket_id = _peer_socket_id;
|
||||
pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp);
|
||||
pkt->storeToData();
|
||||
sendControlPacket(pkt,true);
|
||||
}
|
||||
void SrtTransport::handleACK(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
||||
TraceL;
|
||||
auto now = SteadyClock::now();
|
||||
ACKPacket ack;
|
||||
ack.loadFromData(buf,len);
|
||||
|
||||
ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>();
|
||||
pkt->dst_socket_id = _peer_socket_id;
|
||||
pkt->timestamp = DurationCountMicroseconds(now -_start_timestamp);
|
||||
pkt->timestamp = DurationCountMicroseconds(_now -_start_timestamp);
|
||||
pkt->ack_number = ack.ack_number;
|
||||
pkt->storeToData();
|
||||
sendControlPacket(pkt,true);
|
||||
@@ -247,13 +259,12 @@ void SrtTransport::handleUserDefinedType(uint8_t *buf, int len, struct sockaddr_
|
||||
|
||||
void SrtTransport::handleACKACK(uint8_t *buf, int len, struct sockaddr_storage *addr){
|
||||
//TraceL;
|
||||
auto now = SteadyClock::now();
|
||||
ACKACKPacket::Ptr pkt = std::make_shared<ACKACKPacket>();
|
||||
pkt->loadFromData(buf,len);
|
||||
|
||||
uint32_t rtt = DurationCountMicroseconds(now - _ack_send_timestamp[pkt->ack_number]);
|
||||
_rtt_variance = 3*_rtt_variance/4+abs(_rtt - rtt);
|
||||
_rtt = 7*rtt/8+_rtt/8;
|
||||
uint32_t rtt = DurationCountMicroseconds(_now - _ack_send_timestamp[pkt->ack_number]);
|
||||
_rtt_variance = (3*_rtt_variance+abs(_rtt - rtt))/4;
|
||||
_rtt = (7*rtt+_rtt)/8;
|
||||
|
||||
_ack_send_timestamp.erase(pkt->ack_number);
|
||||
}
|
||||
@@ -268,31 +279,30 @@ void SrtTransport::sendACKPacket() {
|
||||
}
|
||||
|
||||
ACKPacket::Ptr pkt=std::make_shared<ACKPacket>();
|
||||
auto now = SteadyClock::now();
|
||||
pkt->dst_socket_id = _peer_socket_id;
|
||||
pkt->timestamp = DurationCountMicroseconds(now - _start_timestamp);
|
||||
pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
|
||||
pkt->ack_number = ++_ack_number_count;
|
||||
pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq();
|
||||
pkt->rtt = _rtt;
|
||||
pkt->rtt_variance = _rtt_variance;
|
||||
pkt->available_buf_size = _recv_buf->getAvailableBufferSize();
|
||||
pkt->pkt_recv_rate = _pkt_recv_rate_context.getPacketRecvRate();
|
||||
pkt->estimated_link_capacity = _estimated_link_capacity_context.getEstimatedLinkCapacity();
|
||||
pkt->recv_rate = _recv_rate_context.getRecvRate();
|
||||
pkt->pkt_recv_rate = _pkt_recv_rate_context->getPacketRecvRate();
|
||||
pkt->estimated_link_capacity = _estimated_link_capacity_context->getEstimatedLinkCapacity();
|
||||
pkt->recv_rate = _recv_rate_context->getRecvRate();
|
||||
pkt->storeToData();
|
||||
_ack_send_timestamp[pkt->ack_number] = now;
|
||||
_ack_send_timestamp[pkt->ack_number] = _now;
|
||||
_last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number;
|
||||
sendControlPacket(pkt,true);
|
||||
TraceL<<"send ack";
|
||||
//TraceL<<"send ack "<<pkt->dump();
|
||||
}
|
||||
void SrtTransport::sendLightACKPacket() {
|
||||
if(_last_ack_pkt_seq_num == _recv_buf->getExpectedSeq()){
|
||||
return;
|
||||
}
|
||||
ACKPacket::Ptr pkt=std::make_shared<ACKPacket>();
|
||||
auto now = SteadyClock::now();
|
||||
|
||||
pkt->dst_socket_id = _peer_socket_id;
|
||||
pkt->timestamp = DurationCountMicroseconds(now - _start_timestamp);
|
||||
pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
|
||||
pkt->ack_number = 0;
|
||||
pkt->last_ack_pkt_seq_number = _recv_buf->getExpectedSeq();
|
||||
pkt->rtt = 0;
|
||||
@@ -304,15 +314,14 @@ void SrtTransport::sendLightACKPacket() {
|
||||
pkt->storeToData();
|
||||
_last_ack_pkt_seq_num = pkt->last_ack_pkt_seq_number;
|
||||
sendControlPacket(pkt,true);
|
||||
TraceL<<"send light ack";
|
||||
TraceL<<"send ack "<<pkt->dump();
|
||||
}
|
||||
|
||||
void SrtTransport::sendNAKPacket(std::list<PacketQueue::LostPair>& lost_list){
|
||||
NAKPacket::Ptr pkt = std::make_shared<NAKPacket>();
|
||||
auto now = SteadyClock::now();
|
||||
|
||||
pkt->dst_socket_id = _peer_socket_id;
|
||||
pkt->timestamp = DurationCountMicroseconds(now - _start_timestamp);
|
||||
pkt->timestamp = DurationCountMicroseconds(_now - _start_timestamp);
|
||||
pkt->lost_list = lost_list;
|
||||
|
||||
pkt->storeToData();
|
||||
@@ -325,7 +334,7 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
|
||||
pkt->loadFromData(buf,len);
|
||||
|
||||
//TraceL<<" seq="<< pkt->packet_seq_number<<" ts="<<pkt->timestamp<<" size="<<pkt->payloadSize()<<\
|
||||
" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
|
||||
//" PP="<<(int)pkt->PP<<" O="<<(int)pkt->O<<" kK="<<(int)pkt->KK<<" R="<<(int)pkt->R;
|
||||
#if 1
|
||||
_recv_buf->inputPacket(pkt);
|
||||
#else
|
||||
@@ -344,19 +353,19 @@ void SrtTransport::handleDataPacket(uint8_t *buf, int len, struct sockaddr_stora
|
||||
onSRTData(std::move(data),addr);
|
||||
}
|
||||
|
||||
auto nak_interval = (_rtt+_rtt_variance*4)/2/1000;
|
||||
if(_nak_ticker.elapsedTime()>20 && _nak_ticker.elapsedTime()>nak_interval){
|
||||
auto nak_interval = (_rtt+_rtt_variance*4)/2;
|
||||
if(_nak_ticker.elapsedTime(_now)>20*1000 && _nak_ticker.elapsedTime(_now)>nak_interval){
|
||||
auto lost = _recv_buf->getLostSeq();
|
||||
if(!lost.empty()){
|
||||
sendNAKPacket(lost);
|
||||
//TraceL<<"send NAK";
|
||||
}
|
||||
_nak_ticker.resetTime();
|
||||
_nak_ticker.resetTime(_now);
|
||||
}
|
||||
|
||||
if(_ack_ticker.elapsedTime()>=10){
|
||||
if(_ack_ticker.elapsedTime(_now)>10*1000){
|
||||
_light_ack_pkt_count = 0;
|
||||
_ack_ticker.resetTime();
|
||||
_ack_ticker.resetTime(_now);
|
||||
// send a ack per 10 ms for receiver
|
||||
sendACKPacket();
|
||||
}else{
|
||||
|
||||
Reference in New Issue
Block a user