适配ZLToolKit develop分支

This commit is contained in:
xiongziliang
2018-02-23 15:36:51 +08:00
parent b0a64d1e6f
commit f1b4a196c6
30 changed files with 137 additions and 125 deletions

View File

@@ -110,7 +110,7 @@ void HttpClient::onConnect(const SockException &ex) {
send(_body);
}
}
void HttpClient::onRecv(const Socket::Buffer::Ptr &pBuf) {
void HttpClient::onRecv(const Buffer::Ptr &pBuf) {
onRecvBytes(pBuf->data(),pBuf->size());
}

View File

@@ -111,7 +111,7 @@ protected:
virtual void onDisconnect(const SockException &ex){}
private:
virtual void onConnect(const SockException &ex) override;
virtual void onRecv(const Socket::Buffer::Ptr &pBuf) override;
virtual void onRecv(const Buffer::Ptr &pBuf) override;
virtual void onErr(const SockException &ex) override;
//send

View File

@@ -44,6 +44,7 @@ HttpDownloader::~HttpDownloader() {
void HttpDownloader::startDownload(const string& url, const string& filePath,bool bAppend,uint32_t timeOutSecond) {
_filePath = filePath;
_timeOutSecond = timeOutSecond;
_downloadTicker.resetTime();
if(_filePath.empty()){
_filePath = exeDir() + "HttpDownloader/" + MD5(url).hexdigest();
}
@@ -67,7 +68,8 @@ void HttpDownloader::startDownload(const string& url, const string& filePath,boo
}
void HttpDownloader::onResponseHeader(const string& status,const HttpHeader& headers) {
if(status != "200" && status != "206"){
_downloadTicker.resetTime();
if(status != "200" && status != "206"){
//失败
shutdown();
closeFile();
@@ -81,7 +83,8 @@ void HttpDownloader::onResponseHeader(const string& status,const HttpHeader& hea
}
void HttpDownloader::onResponseBody(const char* buf, size_t size, size_t recvedSize, size_t totalSize) {
if(_saveFile){
_downloadTicker.resetTime();
if(_saveFile){
fwrite(buf,size,1,_saveFile);
}
}
@@ -126,7 +129,7 @@ void HttpDownloader::closeFile() {
}
void HttpDownloader::onManager(){
if(elapsedTime() > _timeOutSecond * 1000){
if(_downloadTicker.elapsedTime() > _timeOutSecond * 1000){
//超时
onDisconnect(SockException(Err_timeout,"download timeout"));
shutdown();

View File

@@ -62,6 +62,7 @@ private:
onDownloadResult _onResult;
uint32_t _timeOutSecond;
bool _bDownloadSuccess = false;
Ticker _downloadTicker;
};
} /* namespace Http */

View File

@@ -101,7 +101,7 @@ get_mime_type(const char* name) {
HttpSession::HttpSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock) :
TcpLimitedSession(pTh, pSock) {
TcpSession(pTh, pSock) {
GET_CONFIG_AND_REGISTER(string,rootPath,Config::Http::kRootPath);
m_strPath = rootPath;
@@ -115,7 +115,7 @@ HttpSession::~HttpSession() {
//DebugL;
}
void HttpSession::onRecv(const Socket::Buffer::Ptr &pBuf) {
void HttpSession::onRecv(const Buffer::Ptr &pBuf) {
onRecv(pBuf->data(),pBuf->size());
}
void HttpSession::onRecv(const char *data,int size){
@@ -256,6 +256,11 @@ inline bool HttpSession::checkLiveFlvStream(){
});
//开始发送rtmp负载
//关闭tcp_nodelay ,优化性能
SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(sock_flags);
m_pRingReader = mediaSrc->getRing()->attach();
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
m_pRingReader->setReadCB([weakSelf](const RtmpPacket::Ptr &pkt){
@@ -392,9 +397,9 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
GET_CONFIG_AND_REGISTER(uint32_t,sendBufSize,Config::Http::kSendBufSize);
//不允许主动丢包
sock->setShouldDropPacket(false);
_sock->setShouldDropPacket(false);
//缓存大小为两个包,太大可能导致发送时间太长从而超时
sock->setSendPktSize(2);
_sock->setSendPktSize(2);
weak_ptr<HttpSession> weakSelf = dynamic_pointer_cast<HttpSession>(shared_from_this());
auto onFlush = [pFilePtr,bClose,weakSelf,piLeft]() {
TimeTicker();
@@ -403,7 +408,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
//更新超时定时器
strongSelf->m_ticker.resetTime();
//从循环池获取一个内存片
auto sendBuf = strongSelf->sock->obtainBuffer();
auto sendBuf = strongSelf->obtainBuffer();
sendBuf->setCapacity(sendBufSize);
//本次需要读取文件字节数
int64_t iReq = MIN(sendBufSize,*piLeft);
@@ -421,8 +426,8 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
//InfoL << "send complete!" << iRead << " " << iReq << " " << *piLeft;
if(iRead>0) {
sendBuf->setSize(iRead);
strongSelf->sock->setSendPktSize(3);//强制写入socket缓存
strongSelf->sock->send(sendBuf,sock_flags);
strongSelf->_sock->setSendPktSize(3);//强制写入socket缓存
strongSelf->send(sendBuf);
}
if(bClose) {
strongSelf->shutdown();
@@ -431,7 +436,7 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
}
//文件还未读完
sendBuf->setSize(iRead);
int iSent = strongSelf->sock->send(sendBuf,sock_flags);
int iSent = strongSelf->send(sendBuf);
if(iSent == -1) {
//send error
//InfoL << "send error";
@@ -451,9 +456,10 @@ inline HttpSession::HttpCode HttpSession::Handle_Req_GET() {
return false;
};
//关闭tcp_nodelay ,优化性能
SockUtil::setNoDelay(sock->rawFD(),false);
onFlush();
sock->setOnFlush(onFlush);
SockUtil::setNoDelay(_sock->rawFD(),false);
(*this) << SocketFlags(sock_flags);
onFlush();
_sock->setOnFlush(onFlush);
return Http_success;
}
@@ -690,7 +696,7 @@ public:
#pragma pack(pop)
#endif // defined(_WIN32)
class BufferRtmp : public Socket::Buffer{
class BufferRtmp : public Buffer{
public:
typedef std::shared_ptr<BufferRtmp> Ptr;
BufferRtmp(const RtmpPacket::Ptr & pkt):_rtmp(pkt){}
@@ -708,28 +714,28 @@ private:
void HttpSession::sendRtmp(const RtmpPacket::Ptr &pkt, uint32_t ui32TimeStamp) {
auto size = htonl(m_previousTagSize);
sock->send((char *)&size,4,sock_flags);//send PreviousTagSize
send((char *)&size,4);//send PreviousTagSize
RtmpTagHeader header;
header.type = pkt->typeId;
set_be24(header.data_size, pkt->strBuf.size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
sock->send((char *)&header, sizeof(header),sock_flags);//send tag header
sock->send(std::make_shared<BufferRtmp>(pkt),sock_flags);//send tag data
send((char *)&header, sizeof(header));//send tag header
send(std::make_shared<BufferRtmp>(pkt));//send tag data
m_previousTagSize += (pkt->strBuf.size() + sizeof(header) + 4);
m_ticker.resetTime();
}
void HttpSession::sendRtmp(uint8_t ui8Type, const std::string& strBuf, uint32_t ui32TimeStamp) {
auto size = htonl(m_previousTagSize);
sock->send((char *)&size,4,sock_flags);//send PreviousTagSize
send((char *)&size,4);//send PreviousTagSize
RtmpTagHeader header;
header.type = ui8Type;
set_be24(header.data_size, strBuf.size());
header.timestamp_ex = (uint8_t) ((ui32TimeStamp >> 24) & 0xff);
set_be24(header.timestamp,ui32TimeStamp & 0xFFFFFF);
sock->send((char *)&header, sizeof(header),sock_flags);//send tag header
sock->send(strBuf,sock_flags);//send tag data
send((char *)&header, sizeof(header));//send tag header
send(strBuf);//send tag data
m_previousTagSize += (strBuf.size() + sizeof(header) + 4);
m_ticker.resetTime();
}

View File

@@ -29,7 +29,7 @@
#include <functional>
#include "Common/config.h"
#include "Rtsp/Rtsp.h"
#include "Network/TcpLimitedSession.h"
#include "Network/TcpSession.h"
#include "Rtmp/RtmpMediaSource.h"
using namespace std;
@@ -40,7 +40,7 @@ namespace ZL {
namespace Http {
class HttpSession: public TcpLimitedSession<MAX_TCP_SESSION> {
class HttpSession: public TcpSession {
public:
typedef StrCaseMap KeyValue;
typedef std::function<void(const string &codeOut,
@@ -50,7 +50,7 @@ public:
HttpSession(const std::shared_ptr<ThreadPool> &pTh, const Socket::Ptr &pSock);
virtual ~HttpSession();
virtual void onRecv(const Socket::Buffer::Ptr &) override;
virtual void onRecv(const Buffer::Ptr &) override;
virtual void onError(const SockException &err) override;
virtual void onManager() override;

View File

@@ -58,7 +58,7 @@ public:
virtual ~HttpsSession(){
//m_sslBox.shutdown();
}
void onRecv(const Socket::Buffer::Ptr &pBuf) override{
void onRecv(const Buffer::Ptr &pBuf) override{
TimeTicker();
m_sslBox.onRecv(pBuf->data(), pBuf->size());
}