#pragma once #include #include #include #include "socket/tcp_socket.h" #include "socket/wsa_manager.h" #include "utils/thread_pool.h" #ifdef __linux__ typedef struct _OVERLAPPED { char dummy; } OVERLAPPED; typedef struct __WSABUF { std::uint32_t len; char* buf; } WSABUF; #endif namespace Socket { class IOCP; enum class IOCPEVENT { QUIT, READ, WRITE }; struct IOCPPASSINDATA { OVERLAPPED overlapped; IOCPEVENT event; std::shared_ptr socket; std::uint32_t recvbytes; std::uint32_t sendbytes; std::uint32_t transferredbytes; WSABUF wsabuf; std::uint32_t bufsize; IOCP* IOCPInstance; #ifdef __linux__ std::shared_ptr> sendQueue; #endif IOCPPASSINDATA(std::uint32_t bufsize) { std::memset(&overlapped, 0, sizeof(overlapped)); event = IOCPEVENT::QUIT; socket = nullptr; recvbytes = 0; sendbytes = 0; transferredbytes = 0; this->bufsize = bufsize; IOCPInstance = nullptr; wsabuf.buf = new char[bufsize]; } IOCPPASSINDATA(const IOCPPASSINDATA& other) : event(other.event), socket(other.socket), transferredbytes(other.transferredbytes), bufsize(other.bufsize), IOCPInstance(other.IOCPInstance) #ifdef __linux__ , sendQueue(other.sendQueue) #endif { std::memset(&overlapped, 0, sizeof(overlapped)); recvbytes = 0; sendbytes = 0; wsabuf.buf = new char[bufsize]; std::memcpy(wsabuf.buf, other.wsabuf.buf, other.wsabuf.len); } ~IOCPPASSINDATA() { if (wsabuf.buf != nullptr) delete wsabuf.buf; } IOCPPASSINDATA& operator=(const IOCPPASSINDATA& other) { if (this != &other) { std::memset(&overlapped, 0, sizeof(overlapped)); event = other.event; socket = other.socket; recvbytes = 0; sendbytes = 0; transferredbytes = other.transferredbytes; bufsize = other.bufsize; IOCPInstance = other.IOCPInstance; #ifdef __linux__ sendQueue = other.sendQueue; #endif wsabuf.buf = new char[bufsize]; std::memcpy(wsabuf.buf, other.wsabuf.buf, other.wsabuf.len); } return *this; } }; class IOCP { public: #ifdef _WIN32 static void iocpWather( utils::ThreadPool* threadPool, HANDLE completionPort_, std::function callback) { IOCPPASSINDATA* data; SOCKET sock; DWORD cbTransfrred; int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, (PULONG_PTR)&sock, (LPOVERLAPPED*)&data, INFINITE); if (retVal == 0 || cbTransfrred == 0) { data->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(data->socket->remoteAddr)); } else { data->transferredbytes = cbTransfrred; } threadPool->enqueueJob(callback, data); threadPool->enqueueJob(iocpWather, completionPort_, callback); }; #elif __linux__ static void socketReader( ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { pthread_t tid = pthread_self(); if (event.data.ptr == nullptr) { spdlog::error("invalid call on {}", tid); return; } IOCPPASSINDATA* rootIocpData = (IOCPPASSINDATA*)event.data.ptr; std::lock_guard lock(rootIocpData->socket->readMutex); while (true) { char peekBuffer[1]; int rc = rootIocpData->socket->recv(peekBuffer, 1, MSG_PEEK); if (rc > 0) ; else if (rc == 0) { rootIocpData->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(rootIocpData->socket->remoteAddr)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, rootIocpData->socket->sock, NULL); threadPool->enqueueJob(callback, rootIocpData); // delete rootIocpData; return; } else { if (errno == EAGAIN || errno == EWOULDBLOCK) { spdlog::trace("No data to read on {}", tid); return; } else { rootIocpData->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(rootIocpData->socket->remoteAddr)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, rootIocpData->socket->sock, NULL); threadPool->enqueueJob(callback, rootIocpData); // delete rootIocpData; return; } } Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA(*rootIocpData); ptr->wsabuf.len = 1500; int redSize = 0; int headerSize = 8; int totalRedSize = 0; while (totalRedSize < headerSize) { redSize = ptr->socket->recv(ptr->buf + totalRedSize, headerSize - totalRedSize, 0); if (redSize == SOCKET_ERROR) { ptr->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(ptr->socket->remoteAddr)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); threadPool->enqueueJob(callback, ptr); // delete ptr; return; } else if (redSize == 0) { ptr->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(ptr->socket->remoteAddr)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); threadPool->enqueueJob(callback, ptr); // delete ptr; return; } totalRedSize += redSize; } Packet packet; ::memcpy(packet.serialized, ptr->buf, headerSize); redSize = 0; int dataLength = ntohs(packet.__data.packetLength); while (totalRedSize < dataLength + headerSize) { redSize = ptr->socket->recv(ptr->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0); if (redSize == SOCKET_ERROR) { if (errno == EAGAIN || errno == EWOULDBLOCK) { spdlog::trace("No data to read on {}", tid); return; } ptr->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(ptr->socket->remoteAddr)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); threadPool->enqueueJob(callback, ptr); // delete ptr; return; } else if (redSize == 0) { ptr->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(ptr->socket->remoteAddr)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); threadPool->enqueueJob(callback, ptr); // delete ptr; return; } totalRedSize += redSize; } ptr->transferredbytes = totalRedSize; threadPool->enqueueJob(callback, ptr); } }; static void socketWriter( ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { pthread_t tid = pthread_self(); if (event.data.ptr == nullptr) { spdlog::error("invalid call on {}", tid); return; } IOCPPASSINDATA* rootIocpData = (IOCPPASSINDATA*)event.data.ptr; std::lock_guard lock(rootIocpData->socket->writeMutex); while (!rootIocpData->sendQueue->empty()) { IOCPPASSINDATA* data = rootIocpData->sendQueue->front(); rootIocpData->sendQueue->pop(); if (data == nullptr) { spdlog::error("invalid call on {}", tid); break; } int packetSize = data->wsabuf.len; int totalSentSize = 0; int sentSize = 0; spdlog::trace("Sending to: [{}]", (std::string)data->socket->remoteAddr); while (totalSentSize < packetSize) { sentSize = data->socket->send(data->buf + totalSentSize, packetSize - totalSentSize, 0); if (sentSize == SOCKET_ERROR) { if (errno == EAGAIN || errno == EWOULDBLOCK) { spdlog::warn("buffer full"); continue; } data->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(data->socket->remoteAddr)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, data->socket->sock, NULL); threadPool->enqueueJob(callback, data); // delete data; return; } totalSentSize += sentSize; } data->transferredbytes = totalSentSize; threadPool->enqueueJob(callback, data); } }; static void iocpWatcher( ThreadPool* threadPool, int epollfd, int epollDetroyerFd, std::function callback) { struct epoll_event events[FD_SETSIZE]; pthread_t tid = pthread_self(); int nready = ::epoll_wait(epollfd, events, FD_SETSIZE, -1); for (int i = 0; i < nready; i++) { struct epoll_event current_event = events[i]; if (current_event.events & EPOLLIN) { if (current_event.data.fd == epollDetroyerFd) { return; } std::function task(callback); threadPool->enqueueJob(socketReader, current_event, epollfd, task); } else if (current_event.events & EPOLLOUT) { std::function task(callback); threadPool->enqueueJob(socketWriter, current_event, epollfd, task); } if (--nready <= 0) break; } threadPool->enqueueJob(iocpWatcher, epollfd, epollDetroyerFd, callback); }; #endif IOCP(); ~IOCP(); template void init(utils::ThreadPool* __IOCPThread, _Callable&& callback) { IOCPThread_ = __IOCPThread; #ifdef _WIN32 completionPort_ = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (completionPort_ == NULL) { spdlog::critical("CreateIoCompletionPort()"); std::exit(EXIT_FAILURE); } #elif __linux__ epollfd_ = ::epoll_create(1); epollDetroyerFd = ::eventfd(0, EFD_NONBLOCK); struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd = epollDetroyerFd; ::epoll_ctl(epollfd_, EPOLL_CTL_ADD, epollDetroyerFd, &ev); #endif auto boundFunc = [callback = std::move(callback)]( utils::ThreadPool* __IOCPThread, IOCPPASSINDATA* data) mutable { callback(__IOCPThread, data); }; #ifdef _WIN32 int tCount = __IOCPThread->threadCount; spdlog::info("Resizing threadpool size to: {}", tCount * 2); __IOCPThread->respawnWorker(tCount * 2); for (int i = 0; i < tCount; i++) { std::function task(boundFunc); __IOCPThread->enqueueJob(iocpWather, completionPort_, task); } #elif __linux__ __IOCPThread->respawnWorker(__IOCPThread->threadCount + 1); spdlog::info("Spawning 1 Epoll Waiter..."); __IOCPThread->enqueueJob(iocpWatcher, epollfd_, epollDetroyerFd, boundFunc); #endif } void destruct(); void registerSocket(IOCPPASSINDATA* data); int recv(IOCPPASSINDATA* data, int bufferCount); int send(IOCPPASSINDATA* data, int bufferCount, int __flags); #ifdef __linux__ int epollDetroyerFd = -1; #endif private: struct WSAManager* wsaManager = WSAManager::GetInstance(); utils::ThreadPool* IOCPThread_; #ifdef _WIN32 HANDLE completionPort_ = INVALID_HANDLE_VALUE; #elif __linux__ int epollfd_ = -1; std::unordered_map, std::mutex> writeMutex; std::unordered_map, std::queue> writeBuffer; #endif }; } // namespace Socket