支持http-ts/websocket-ts直播

This commit is contained in:
xiongziliang
2020-09-20 00:21:46 +08:00
parent f84981dc75
commit c76930e3cd
9 changed files with 371 additions and 80 deletions

View File

@@ -8,15 +8,9 @@
* may be found in the AUTHORS file in the root of the source tree.
*/
#if !defined(_WIN32)
#include <dirent.h>
#endif //!defined(_WIN32)
#include <stdio.h>
#include <sys/stat.h>
#include <algorithm>
#include <iomanip>
#include "Common/config.h"
#include "strCoding.h"
#include "HttpSession.h"
@@ -96,10 +90,10 @@ void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
}
void HttpSession::onError(const SockException& err) {
if(_is_flv_stream){
if(_is_live_stream){
uint64_t duration = _ticker.createdTime()/1000;
//flv播放器
WarnP(this) << "FLV播放器("
//flv/ts播放器
WarnP(this) << "FLV/TS播放器("
<< _mediaInfo._vhost << "/"
<< _mediaInfo._app << "/"
<< _mediaInfo._streamid
@@ -107,8 +101,8 @@ void HttpSession::onError(const SockException& err) {
<< ",耗时(s):" << duration;
GET_CONFIG(uint32_t,iFlowThreshold,General::kFlowThreshold);
if(_ui64TotalBytes > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _ui64TotalBytes, duration , true, static_cast<SockInfo &>(*this));
if(_total_bytes_usage > iFlowThreshold * 1024){
NoticeCenter::Instance().emitEvent(Broadcast::kBroadcastFlowReport, _mediaInfo, _total_bytes_usage, duration , true, static_cast<SockInfo &>(*this));
}
return;
}
@@ -135,8 +129,7 @@ bool HttpSession::checkWebSocket(){
if (Sec_WebSocket_Key.empty()) {
return false;
}
auto Sec_WebSocket_Accept = encodeBase64(
SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
auto Sec_WebSocket_Accept = encodeBase64(SHA1::encode_bin(Sec_WebSocket_Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"));
KeyValue headerOut;
headerOut["Upgrade"] = "websocket";
@@ -147,17 +140,23 @@ bool HttpSession::checkWebSocket(){
}
auto res_cb = [this, headerOut]() {
_flv_over_websocket = true;
_live_over_websocket = true;
sendResponse("101 Switching Protocols", false, nullptr, headerOut, nullptr, true);
};
//判断是否为websocket-flv
if (checkLiveFlvStream(res_cb)) {
if (checkLiveStreamFlv(res_cb)) {
//这里是websocket-flv直播请求
return true;
}
//如果checkLiveFlvStream返回false,则代表不是websocket-flv而是普通的websocket连接
//判断是否为websocket-ts
if (checkLiveStreamTS(res_cb)) {
//这里是websocket-ts直播请求
return true;
}
//这是普通的websocket连接
if (!onWebSocketConnect(_parser)) {
sendResponse("501 Not Implemented", true, nullptr, headerOut);
return true;
@@ -166,75 +165,63 @@ bool HttpSession::checkWebSocket(){
return true;
}
//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2
//如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。
bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
auto pos = strrchr(_parser.Url().data(),'.');
if(!pos){
//未找到".flv"后缀
bool HttpSession::checkLiveStream(const string &schema, const string &url_suffix, const function<void(const MediaSource::Ptr &src)> &cb){
auto pos = strrchr(_parser.Url().data(), '.');
if (!pos) {
//未找到后缀
return false;
}
if(strcasecmp(pos,".flv") != 0){
//未找到".flv"后缀
if (strcasecmp(pos, url_suffix.data()) != 0) {
//未找到直播流后缀
return false;
}
//这是个.flv的流
_mediaInfo.parse(string(RTMP_SCHEMA) + "://" + _parser["Host"] + _parser.FullUrl());
if(_mediaInfo._app.empty() || _mediaInfo._streamid.size() < 5){
//这是个符合后缀的直播的流
_mediaInfo.parse(schema + "://" + _parser["Host"] + _parser.FullUrl());
if (_mediaInfo._app.empty() || _mediaInfo._streamid.size() < url_suffix.size() + 1) {
//url不合法
return false;
}
_mediaInfo._streamid.erase(_mediaInfo._streamid.size() - 4);//去除.flv后缀
bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
//去除后缀
bool close_flag = !strcasecmp(_parser["Connection"].data(), "close");
//流id去除后缀
_mediaInfo._streamid.erase(_mediaInfo._streamid.size() - url_suffix.size());
weak_ptr<HttpSession> weak_self = dynamic_pointer_cast<HttpSession>(shared_from_this());
//鉴权结果回调
auto onRes = [cb, weakSelf, bClose](const string &err){
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
auto onRes = [cb, weak_self, close_flag](const string &err) {
auto strong_self = weak_self.lock();
if (!strong_self) {
//本对象已经销毁
return;
}
if(!err.empty()){
if (!err.empty()) {
//播放鉴权失败
strongSelf->sendResponse("401 Unauthorized", bClose, nullptr, KeyValue(), std::make_shared<HttpStringBody>(err));
strong_self->sendResponse("401 Unauthorized", close_flag, nullptr, KeyValue(), std::make_shared<HttpStringBody>(err));
return;
}
//异步查找rtmp
MediaSource::findAsync(strongSelf->_mediaInfo, strongSelf, [weakSelf, bClose, cb](const MediaSource::Ptr &src) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
//异步查找直播
MediaSource::findAsync(strong_self->_mediaInfo, strong_self, [weak_self, close_flag, cb](const MediaSource::Ptr &src) {
auto strong_self = weak_self.lock();
if (!strong_self) {
//本对象已经销毁
return;
}
auto rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(src);
if (!rtmp_src) {
if (!src) {
//未找到该流
strongSelf->sendNotFound(bClose);
strong_self->sendNotFound(close_flag);
return;
}
if (!cb) {
//找到rtmp源发送http头负载后续发送
strongSelf->sendResponse("200 OK", false, "video/x-flv", KeyValue(), nullptr, true);
} else {
//自定义发送http头
cb();
}
//http-flv直播牺牲延时提升发送性能
strongSelf->setSocketFlags();
strongSelf->start(strongSelf->getPoller(), rtmp_src);
strongSelf->_is_flv_stream = true;
strong_self->_is_live_stream = true;
//触发回调
cb(src);
});
};
Broadcast::AuthInvoker invoker = [weakSelf, onRes](const string &err) {
auto strongSelf = weakSelf.lock();
Broadcast::AuthInvoker invoker = [weak_self, onRes](const string &err) {
auto strongSelf = weak_self.lock();
if (!strongSelf) {
return;
}
@@ -251,34 +238,91 @@ bool HttpSession::checkLiveFlvStream(const function<void()> &cb){
return true;
}
//http-ts 链接格式:http://vhost-url:port/app/streamid.ts?key1=value1&key2=value2
//如果url(除去?以及后面的参数)后缀是.ts,那么表明该url是一个http-ts直播。
bool HttpSession::checkLiveStreamTS(const function<void()> &cb){
return checkLiveStream(TS_SCHEMA, ".ts", [this, cb](const MediaSource::Ptr &src) {
auto ts_src = dynamic_pointer_cast<TSMediaSource>(src);
assert(ts_src);
if (!cb) {
//找到源发送http头负载后续发送
sendResponse("200 OK", false, "video/mp2t", KeyValue(), nullptr, true);
} else {
//自定义发送http头
cb();
}
//直播牺牲延时提升发送性能
setSocketFlags();
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
_ts_reader = ts_src->getRing()->attach(getPoller());
_ts_reader->setReadCB([weakSelf](const TSMediaSource::RingDataType &ts_list) {
auto strongSelf = weakSelf.lock();
if (!strongSelf) {
//本对象已经销毁
return;
}
int i = 0;
int size = ts_list->size();
strongSelf->setSendFlushFlag(false);
ts_list->for_each([&](const TSPacket::Ptr &ts) {
strongSelf->onWrite(ts, ++i == size);
});
});
});
}
//http-flv 链接格式:http://vhost-url:port/app/streamid.flv?key1=value1&key2=value2
//如果url(除去?以及后面的参数)后缀是.flv,那么表明该url是一个http-flv直播。
bool HttpSession::checkLiveStreamFlv(const function<void()> &cb){
return checkLiveStream(RTMP_SCHEMA, ".flv", [this, cb](const MediaSource::Ptr &src) {
auto rtmp_src = dynamic_pointer_cast<RtmpMediaSource>(src);
assert(rtmp_src);
if (!cb) {
//找到源发送http头负载后续发送
sendResponse("200 OK", false, "video/x-flv", KeyValue(), nullptr, true);
} else {
//自定义发送http头
cb();
}
//直播牺牲延时提升发送性能
setSocketFlags();
start(getPoller(), rtmp_src);
});
}
void HttpSession::Handle_Req_GET(int64_t &content_len) {
Handle_Req_GET_l(content_len, true);
}
void HttpSession::Handle_Req_GET_l(int64_t &content_len, bool sendBody) {
//先看看是否为WebSocket请求
if(checkWebSocket()){
if (checkWebSocket()) {
content_len = -1;
_contentCallBack = [this](const char *data,uint64_t len){
WebSocketSplitter::decode((uint8_t *)data,len);
_contentCallBack = [this](const char *data, uint64_t len) {
WebSocketSplitter::decode((uint8_t *) data, len);
//_contentCallBack是可持续的后面还要处理后续数据
return true;
};
return;
}
if(emitHttpEvent(false)){
if (emitHttpEvent(false)) {
//拦截http api事件
return;
}
if(checkLiveFlvStream()){
if (checkLiveStreamFlv()) {
//拦截http-flv播放器
return;
}
bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
if (checkLiveStreamTS()) {
//拦截http-ts播放器
return;
}
bool bClose = !strcasecmp(_parser["Connection"].data(),"close");
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
HttpFileManager::onAccessPath(*this, _parser, [weakSelf, bClose](const string &status_code, const string &content_type,
const StrCaseMap &responseHeader, const HttpBody::Ptr &body) {
@@ -623,26 +667,26 @@ void HttpSession::onWrite(const Buffer::Ptr &buffer, bool flush) {
}
_ticker.resetTime();
if(!_flv_over_websocket){
_ui64TotalBytes += buffer->size();
if (!_live_over_websocket) {
_total_bytes_usage += buffer->size();
send(buffer);
}else{
} else {
WebSocketHeader header;
header._fin = true;
header._reserved = 0;
header._opcode = WebSocketHeader::BINARY;
header._mask_flag = false;
WebSocketSplitter::encode(header,buffer);
WebSocketSplitter::encode(header, buffer);
}
if(flush){
if (flush) {
//本次刷新缓存后,下次不用刷新缓存
HttpSession::setSendFlushFlag(false);
}
}
void HttpSession::onWebSocketEncodeData(const Buffer::Ptr &buffer){
_ui64TotalBytes += buffer->size();
_total_bytes_usage += buffer->size();
send(buffer);
}