添加win测试程序

This commit is contained in:
xiongziliang
2017-12-13 14:18:10 +08:00
parent 34176f0e2b
commit f347bec3cb
100 changed files with 10477 additions and 0 deletions

View File

@@ -0,0 +1,246 @@
//
// Socket.h
// xzl
//
// Created by xzl on 16/4/13.
//
#ifndef Socket_h
#define Socket_h
#include <memory>
#include <string>
#include <deque>
#include <mutex>
#include <atomic>
#include <functional>
#include "Util/util.h"
#include "Util/TimeTicker.h"
#include "Poller/Timer.h"
#include "Network/sockutil.h"
#include "Thread/spin_mutex.h"
using namespace std;
using namespace ZL::Util;
using namespace ZL::Poller;
using namespace ZL::Thread;
namespace ZL {
namespace Network {
#if defined(MSG_NOSIGNAL)
#define FLAG_NOSIGNAL MSG_NOSIGNAL
#else
#define FLAG_NOSIGNAL 0
#endif //MSG_NOSIGNAL
#if defined(MSG_MORE)
#define FLAG_MORE MSG_MORE
#else
#define FLAG_MORE 0
#endif //MSG_MORE
#if defined(MSG_DONTWAIT)
#define FLAG_DONTWAIT MSG_DONTWAIT
#else
#define FLAG_DONTWAIT 0
#endif //MSG_DONTWAIT
#define TCP_DEFAULE_FLAGS (FLAG_NOSIGNAL | FLAG_DONTWAIT)
#define UDP_DEFAULE_FLAGS (FLAG_NOSIGNAL | FLAG_DONTWAIT)
#define MAX_SEND_PKT (256)
#if defined(__APPLE__)
#import "TargetConditionals.h"
#if TARGET_IPHONE_SIMULATOR
#define OS_IPHONE
#elif TARGET_OS_IPHONE
#define OS_IPHONE
#endif
#endif //__APPLE__
typedef enum {
Err_success = 0, //成功
Err_eof, //eof
Err_timeout, //超时
Err_refused,
Err_dns,
Err_other,
} ErrCode;
class SockException: public std::exception {
public:
SockException(ErrCode _errCode = Err_success, const string &_errMsg = "") {
errMsg = _errMsg;
errCode = _errCode;
}
void reset(ErrCode _errCode, const string &_errMsg) {
errMsg = _errMsg;
errCode = _errCode;
}
virtual const char* what() const noexcept {
return errMsg.c_str();
}
ErrCode getErrCode() const {
return errCode;
}
operator bool() const{
return errCode != Err_success;
}
private:
string errMsg;
ErrCode errCode;
};
class SockFD
{
public:
typedef std::shared_ptr<SockFD> Ptr;
SockFD(int sock){
_sock = sock;
}
virtual ~SockFD(){
::shutdown(_sock, SHUT_RDWR);
#if defined (OS_IPHONE)
unsetSocketOfIOS(_sock);
#endif //OS_IPHONE
int fd = _sock;
EventPoller::Instance().delEvent(fd,[fd](bool){
close(fd);
});
}
void setConnected(){
#if defined (OS_IPHONE)
setSocketOfIOS(_sock);
#endif //OS_IPHONE
}
int rawFd() const{
return _sock;
}
private:
int _sock;
#if defined (OS_IPHONE)
void *readStream=NULL;
void *writeStream=NULL;
bool setSocketOfIOS(int socket);
void unsetSocketOfIOS(int socket);
#endif //OS_IPHONE
};
class Socket: public std::enable_shared_from_this<Socket> {
public:
class Buffer {
public:
typedef std::shared_ptr<Buffer> Ptr;
Buffer(uint32_t size) {
_size = size;
_data = new char[size];
}
virtual ~Buffer() {
delete[] _data;
}
const char *data() const {
return _data;
}
uint32_t size() const {
return _size;
}
private:
friend class Socket;
char *_data;
uint32_t _size;
};
typedef std::shared_ptr<Socket> Ptr;
typedef function<void(const Buffer::Ptr &buf, struct sockaddr *addr)> onReadCB;
typedef function<void(const SockException &err)> onErrCB;
typedef function<void(Socket::Ptr &sock)> onAcceptCB;
typedef function<bool()> onFlush;
Socket();
virtual ~Socket();
int rawFD() const{
SockFD::Ptr sock;
{
lock_guard<spin_mutex> lck(_mtx_sockFd);
sock = _sockFd;
}
if(!sock){
return -1;
}
return sock->rawFd();
}
void connect(const string &url, uint16_t port, onErrCB &&connectCB, int timeoutSec = 5);
bool listen(const uint16_t port, const char *localIp = "0.0.0.0", int backLog = 1024);
bool bindUdpSock(const uint16_t port, const char *localIp = "0.0.0.0");
void setOnRead(const onReadCB &cb);
void setOnErr(const onErrCB &cb);
void setOnAccept(const onAcceptCB &cb);
void setOnFlush(const onFlush &cb);
int send(const char *buf, int size = 0,int flags = TCP_DEFAULE_FLAGS);
int send(const string &buf,int flags = TCP_DEFAULE_FLAGS);
int sendTo(const char *buf, int size, struct sockaddr *peerAddr,int flags = UDP_DEFAULE_FLAGS);
int sendTo(const string &buf, struct sockaddr *peerAddr,int flags = UDP_DEFAULE_FLAGS);
bool emitErr(const SockException &err);
void enableRecv(bool enabled);
string get_local_ip();
uint16_t get_local_port();
string get_peer_ip();
uint16_t get_peer_port();
void setSendPktSize(uint32_t iPktSize){
_iMaxSendPktSize = iPktSize;
}
private:
mutable spin_mutex _mtx_sockFd;
SockFD::Ptr _sockFd;
//send buffer
recursive_mutex _mtx_sendBuf;
deque<string> _sendPktBuf;
deque<struct sockaddr> _udpSendPeer;
/////////////////////
std::shared_ptr<Timer> _conTimer;
struct sockaddr _peerAddr;
spin_mutex _mtx_read;
spin_mutex _mtx_err;
spin_mutex _mtx_accept;
spin_mutex _mtx_flush;
onReadCB _readCB;
onErrCB _errCB;
onAcceptCB _acceptCB;
onFlush _flushCB;
Ticker _flushTicker;
int _lastSendFlags = TCP_DEFAULE_FLAGS;
uint32_t _iMaxSendPktSize = MAX_SEND_PKT;
atomic_bool _enableRecv;
void closeSock();
bool setPeerSock(int fd, struct sockaddr *addr);
bool attachEvent(const SockFD::Ptr &pSock,bool isUdp = false);
int onAccept(const SockFD::Ptr &pSock,int event);
int onRead(const SockFD::Ptr &pSock,bool mayEof=true);
void onError(const SockFD::Ptr &pSock);
int realSend(const string &buf, struct sockaddr *peerAddr,int flags);
int onWrite(const SockFD::Ptr &pSock, bool bMainThread,int flags,bool isUdp);
void onConnected(const SockFD::Ptr &pSock, const onErrCB &connectCB);
void onFlushed(const SockFD::Ptr &pSock);
void startWriteEvent(const SockFD::Ptr &pSock);
void stopWriteEvent(const SockFD::Ptr &pSock);
bool sendTimeout(bool isUdp);
SockFD::Ptr makeSock(int sock){
return std::make_shared<SockFD>(sock);
}
static SockException getSockErr(const SockFD::Ptr &pSock,bool tryErrno=true);
};
} // namespace Network
} // namespace ZL
#endif /* Socket_h */

View File

@@ -0,0 +1,106 @@
/*
* TcpClient.h
*
* Created on: 2017年2月13日
* Author: xzl
*/
#ifndef SRC_NETWORK_TCPCLIENT_H_
#define SRC_NETWORK_TCPCLIENT_H_
#include <memory>
#include <functional>
#include "Socket.h"
#include "Util/TimeTicker.h"
#include "Thread/WorkThreadPool.h"
#include "Thread/spin_mutex.h"
using namespace std;
using namespace ZL::Util;
using namespace ZL::Thread;
namespace ZL {
namespace Network {
class TcpClient : public std::enable_shared_from_this<TcpClient> {
public:
typedef std::shared_ptr<TcpClient> Ptr;
TcpClient();
virtual ~TcpClient();
protected:
void startConnect(const string &strUrl, uint16_t iPort, int iTimeOutSec = 3);
void shutdown();
virtual int send(const string &str);
virtual int send(const char *str, int len);
bool alive() {
lock_guard<spin_mutex> lck(m_mutex);
return m_pSock.operator bool();
}
string get_local_ip() {
decltype(m_pSock) sockTmp;
{
lock_guard<spin_mutex> lck(m_mutex);
sockTmp = m_pSock;
}
if(!sockTmp){
return "";
}
return sockTmp->get_local_ip();
}
uint16_t get_local_port() {
decltype(m_pSock) sockTmp;
{
lock_guard<spin_mutex> lck(m_mutex);
sockTmp = m_pSock;
}
if(!sockTmp){
return 0;
}
return sockTmp->get_local_port();
}
string get_peer_ip() {
decltype(m_pSock) sockTmp;
{
lock_guard<spin_mutex> lck(m_mutex);
sockTmp = m_pSock;
}
if(!sockTmp){
return "";
}
return sockTmp->get_peer_ip();
}
uint16_t get_peer_port() {
decltype(m_pSock) sockTmp;
{
lock_guard<spin_mutex> lck(m_mutex);
sockTmp = m_pSock;
}
if(!sockTmp){
return 0;
}
return sockTmp->get_peer_port();
}
uint64_t elapsedTime();
//链接成功后客户端将绑定一个后台线程并且onConnect/onRecv/onSend/onErr事件将在该后台线程触发
virtual void onConnect(const SockException &ex) {}
virtual void onRecv(const Socket::Buffer::Ptr &pBuf) {}
virtual void onSend() {}
virtual void onErr(const SockException &ex) {}
Socket::Ptr m_pSock;
private:
Ticker m_ticker;
spin_mutex m_mutex;
void onSockConnect(const SockException &ex);
void onSockRecv(const Socket::Buffer::Ptr &pBuf);
void onSockSend();
void onSockErr(const SockException &ex);
};
} /* namespace Network */
} /* namespace ZL */
#endif /* SRC_NETWORK_TCPCLIENT_H_ */

View File

@@ -0,0 +1,64 @@
/*
* Session.h
*
* Created on: 2015年10月27日
* Author: root
*/
#ifndef SERVER_LIMITEDSESSION_H_
#define SERVER_LIMITEDSESSION_H_
#include <memory>
#include "Util/logger.h"
#include "TcpSession.h"
using namespace std;
using namespace ZL::Util;
namespace ZL {
namespace Network {
template<int MaxCount>
class TcpLimitedSession: public TcpSession {
public:
TcpLimitedSession(const std::shared_ptr<ThreadPool> &_th, const Socket::Ptr &_sock) :
TcpSession(_th,_sock) {
lock_guard<recursive_mutex> lck(stackMutex());
static uint64_t maxSeq(0);
sessionSeq = maxSeq++;
auto &stack = getStack();
stack.emplace(this);
if(stack.size() > MaxCount){
auto it = stack.begin();
(*it)->safeShutdown();
stack.erase(it);
WarnL << "超过TCP个数限制:" << MaxCount;
}
}
virtual ~TcpLimitedSession() {
lock_guard<recursive_mutex> lck(stackMutex());
getStack().erase(this);
}
private:
uint64_t sessionSeq; //会话栈顺序
struct Comparer {
bool operator()(TcpLimitedSession *x, TcpLimitedSession *y) const {
return x->sessionSeq < y->sessionSeq;
}
};
static recursive_mutex &stackMutex(){
static recursive_mutex mtx;
return mtx;
}
//RTSP会话栈,先创建的在前面
static set<TcpLimitedSession *, Comparer> &getStack(){
static set<TcpLimitedSession *, Comparer> stack;
return stack;
}
};
} /* namespace Session */
} /* namespace ZL */
#endif /* SERVER_LIMITEDSESSION_H_ */

View File

@@ -0,0 +1,134 @@
/*
* TcpServer.h
*
* Created on: 2016年8月9日
* Author: xzl
*/
#ifndef TCPSERVER_TCPSERVER_H_
#define TCPSERVER_TCPSERVER_H_
#include <memory>
#include <exception>
#include <functional>
#include "Socket.h"
#include "Util/util.h"
#include "Util/uv_errno.h"
#include "Util/logger.h"
#include "Poller/Timer.h"
#include "Thread/semaphore.h"
#include "Thread/WorkThreadPool.h"
using namespace std;
using namespace ZL::Util;
using namespace ZL::Poller;
using namespace ZL::Thread;
namespace ZL {
namespace Network {
template<typename Session>
class TcpServer {
public:
typedef std::shared_ptr<TcpServer> Ptr;
TcpServer() {
socket.reset(new Socket());
sessionMap.reset(new typename decltype(sessionMap)::element_type);
}
~TcpServer() {
TraceL << "start clean...";
timer.reset();
socket.reset();
typename decltype(sessionMap)::element_type copyMap;
sessionMap->swap(copyMap);
for (auto it = copyMap.begin(); it != copyMap.end(); ++it) {
auto session = it->second;
it->second->async_first( [session]() {
session->onError(SockException(Err_other,"Tcp server shutdown!"));
});
}
TraceL << "clean completed!";
}
void start(uint16_t port, const std::string& host = "0.0.0.0", uint32_t backlog = 1024) {
bool success = socket->listen(port, host.c_str(), backlog);
if (!success) {
string err = (StrPrinter << "listen on " << host << ":" << port << "] failed:" << get_uv_errmsg(true)).operator <<(endl);
throw std::runtime_error(err);
}
socket->setOnAccept( bind(&TcpServer::onAcceptConnection, this, placeholders::_1));
timer.reset(new Timer(2, [this]()->bool {
this->onManagerSession();
return true;
}));
InfoL << "TCP Server listening on " << host << ":" << port;
}
private:
Socket::Ptr socket;
std::shared_ptr<Timer> timer;
std::shared_ptr<std::unordered_map<Socket *, std::shared_ptr<Session> > > sessionMap;
void onAcceptConnection(const Socket::Ptr & sock) {
// 接收到客户端连接请求
auto session(std::make_shared<Session>(WorkThreadPool::Instance().getWorkThread(), sock));
auto sockPtr(sock.get());
auto sessionMapTmp(sessionMap);
weak_ptr<Session> weakSession(session);
sessionMapTmp->emplace(sockPtr, session);
// 会话接收数据事件
sock->setOnRead([weakSession](const Socket::Buffer::Ptr &buf, struct sockaddr *addr){
//获取会话强应用
auto strongSession=weakSession.lock();
if(!strongSession) {
//会话对象已释放
return;
}
//在会话线程中执行onRecv操作
strongSession->async([weakSession,buf]() {
auto strongSession=weakSession.lock();
if(!strongSession) {
return;
}
strongSession->onRecv(buf);
});
});
//会话接收到错误事件
sock->setOnErr([weakSession,sockPtr,sessionMapTmp](const SockException &err){
//获取会话强应用
auto strongSession=weakSession.lock();
//移除掉会话
sessionMapTmp->erase(sockPtr);
if(!strongSession) {
//会话对象已释放
return;
}
//在会话线程中执行onError操作
strongSession->async_first([strongSession,err]() {
strongSession->onError(err);
});
});
}
void onManagerSession() {
//DebugL<<EventPoller::Instance().isMainThread();
for (auto &pr : *sessionMap) {
weak_ptr<Session> weakSession = pr.second;
pr.second->async([weakSession]() {
auto strongSession=weakSession.lock();
if(!strongSession) {
return;
}
strongSession->onManager();
});
}
}
};
} /* namespace Network */
} /* namespace ZL */
#endif /* TCPSERVER_TCPSERVER_H_ */

View File

@@ -0,0 +1,91 @@
/*
* Session.h
*
* Created on: 2015年10月27日
* Author: root
*/
#ifndef SERVER_SESSION_H_
#define SERVER_SESSION_H_
#include <memory>
#include "Socket.h"
#include "Util/logger.h"
#include "Thread/ThreadPool.h"
using namespace std;
using namespace ZL::Util;
using namespace ZL::Thread;
namespace ZL {
namespace Network {
class TcpSession: public std::enable_shared_from_this<TcpSession> {
public:
TcpSession(const std::shared_ptr<ThreadPool> &_th, const Socket::Ptr &_sock) :
sock(_sock), th(_th) {
localIp = sock->get_local_ip();
peerIp = sock->get_peer_ip();
localPort = sock->get_local_port();
peerPort = sock->get_peer_port();
}
virtual ~TcpSession() {
}
virtual void onRecv(const Socket::Buffer::Ptr &) =0;
virtual void onError(const SockException &err) =0;
virtual void onManager() =0;
template <typename T>
void async(T &&task) {
th->async(std::forward<T>(task));
}
template <typename T>
void async_first(T &&task) {
th->async_first(std::forward<T>(task));
}
protected:
const string& getLocalIp() const {
return localIp;
}
const string& getPeerIp() const {
return peerIp;
}
uint16_t getLocalPort() const {
return localPort;
}
uint16_t getPeerPort() const {
return peerPort;
}
virtual void shutdown() {
sock->emitErr(SockException(Err_other, "self shutdown"));
}
void safeShutdown(){
std::weak_ptr<TcpSession> weakSelf = shared_from_this();
async_first([weakSelf](){
auto strongSelf = weakSelf.lock();
if(strongSelf){
strongSelf->shutdown();
}
});
}
virtual int send(const string &buf) {
return sock->send(buf);
}
virtual int send(const char *buf, int size) {
return sock->send(buf, size);
}
Socket::Ptr sock;
private:
std::shared_ptr<ThreadPool> th;
string localIp;
string peerIp;
uint16_t localPort;
uint16_t peerPort;
};
} /* namespace Session */
} /* namespace ZL */
#endif /* SERVER_SESSION_H_ */

View File

@@ -0,0 +1,86 @@
#ifndef SOCKUTIL_H
#define SOCKUTIL_H
#if defined(_WIN32)
#include <WinSock2.h>
#include <Iphlpapi.h>
#pragma comment (lib,"WS2_32")
#pragma comment(lib,"Iphlpapi.lib")
#else
#include <netdb.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <net/if.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#endif // defined(_WIN32)
#include <string>
#include <string.h>
#include <stdint.h>
using namespace std;
#if defined(_WIN32)
#ifndef socklen_t
#define socklen_t int
#endif //!socklen_t
#ifndef SHUT_RDWR
#define SHUT_RDWR 2
#endif //!SHUT_RDWR
int ioctl(int fd, long cmd, u_long *ptr);
int close(int fd);
#endif // defined(_WIN32)
namespace ZL {
namespace Network {
class SockUtil {
public:
static int connect(const char *host, uint16_t port, bool bAsync = true);
static int listen(const uint16_t port, const char *localIp = "0.0.0.0",
int backLog = 1024);
static int bindUdpSock(const uint16_t port,
const char *localIp = "0.0.0.0");
static int setNoDelay(int sockFd, bool on = true);
static int setNoSigpipe(int sock);
static int setNoBlocked(int sock, bool noblock = true);
static int setRecvBuf(int sock, int size = 256 * 1024);
static int setSendBuf(int sock, int size = 256 * 1024);
static int setReuseable(int sockFd, bool on = true);
static int setBroadcast(int sockFd, bool on = true);
static int setKeepAlive(int sockFd, bool on = true);
//组播相关
static int setMultiTTL(int sockFd, uint8_t ttl = 64);
static int setMultiIF(int sockFd, const char *strLocalIp);
static int setMultiLOOP(int sockFd, bool bAccept = false);
static int joinMultiAddr(int sockFd, const char *strAddr, const char* strLocalIp = "0.0.0.0");
static int leaveMultiAddr(int sockFd, const char *strAddr, const char* strLocalIp = "0.0.0.0");
static int joinMultiAddrFilter(int sockFd, const char* strAddr, const char* strSrcIp, const char* strLocalIp = "0.0.0.0");
static int leaveMultiAddrFilter(int sockFd, const char* strAddr, const char* strSrcIp, const char* strLocalIp = "0.0.0.0");
static int getSockError(int sockFd);
static int setCloseWait(int sockFd, int second = 0);
static string get_local_ip(int fd);
static string get_local_ip();
static uint16_t get_local_port(int fd);
static string get_peer_ip(int fd);
static uint16_t get_peer_port(int fd);
static string get_ifr_name(const char *localIp);
static string get_ifr_mask(const char *ifrName);
static string get_ifr_brdaddr(const char *ifrName);
static bool in_same_lan(const char *myIp, const char *dsrIp);
};
} // namespace Network
} // namespace ZL
#endif // !SOCKUTIL_H