diff --git a/Server/include/ServerManager/ServerManager.hpp b/Server/include/ServerManager/ServerManager.hpp index e210f3c..b10ba04 100644 --- a/Server/include/ServerManager/ServerManager.hpp +++ b/Server/include/ServerManager/ServerManager.hpp @@ -16,7 +16,7 @@ public: void init(_Callable _IOCPClient) { auto config = ConfigManager::load(); log::setDefaultLogger(config.logLevel, config.logFileName, config.logfileSize, config.logfileCount); - threadPool_.init(0); + threadPool_.init(2); iocp_.init(&threadPool_, _IOCPClient); struct Address serveraddr; diff --git a/Server/src/ServerManager/ServerManager.cpp b/Server/src/ServerManager/ServerManager.cpp index 47534de..19dc5e5 100644 --- a/Server/src/ServerManager/ServerManager.cpp +++ b/Server/src/ServerManager/ServerManager.cpp @@ -182,9 +182,9 @@ void ServerManager::processLoginRequestPacket(LoginRequestPacket loginRequestPac ::memcpy(loginResponsePacket.__data.yourId, &yourId, sizeof(Snowflake)); loginResponsePacket.convToN(); - memcpy(data->wsabuf.buf, loginResponsePacket.serialized, 16); - data->sendbytes = 16; - data->wsabuf.len = 16; + memcpy(data->wsabuf.buf, loginResponsePacket.serialized, 18); + data->sendbytes = 18; + data->wsabuf.len = 18; data->IOCPInstance->send(data, 1, 0); } diff --git a/impl/Socket/IOCP.cpp b/impl/Socket/IOCP.cpp index fd1f756..c8634d0 100644 --- a/impl/Socket/IOCP.cpp +++ b/impl/Socket/IOCP.cpp @@ -5,6 +5,20 @@ namespace Chattr { +IOCP::IOCP() { + +} + +IOCP::~IOCP() { + destruct(); +} + +void IOCP::destruct() { +#ifdef __linux__ + close(epollfd_); +#endif +} + void IOCP::registerSocket(Chattr::IOCPPASSINDATA* data) { #ifdef _WIN32 HANDLE returnData = ::CreateIoCompletionPort((HANDLE)data->socket->sock, completionPort_, data->socket->sock, 0); diff --git a/include/Socket/IOCP.hpp b/include/Socket/IOCP.hpp index 345e598..892e9f0 100644 --- a/include/Socket/IOCP.hpp +++ b/include/Socket/IOCP.hpp @@ -9,7 +9,7 @@ #include "precomp.hpp" -#ifndef _WIN32 +#ifdef __linux__ typedef struct _OVERLAPPED { char dummy; @@ -67,7 +67,7 @@ public: #elif __linux__ static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr; - + pthread_t tid = pthread_self(); if (iocpData == nullptr) { @@ -84,25 +84,25 @@ public: int totalRedSize = 0; while (totalRedSize < headerSize) { - redSize = iocpData->socket->recv(iocpData->buf, headerSize - totalRedSize, 0); + redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, headerSize - totalRedSize, 0); - if (redSize <= 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - spdlog::trace("{}", strerror(errno)); - break; - } else { - spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; - } - + if (redSize == SOCKET_ERROR) { + spdlog::error("recv() [{}]", strerror(errno)); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; + } + else if (redSize == 0) { + spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; } totalRedSize += redSize; } Packet packet; - ::memcpy(&packet.serialized, iocpData->buf, headerSize); + ::memcpy(packet.serialized, iocpData->buf, headerSize); redSize = 0; int dataLength = ntohs(packet.__data.packetLength); @@ -110,17 +110,17 @@ public: while (totalRedSize < dataLength + headerSize) { redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0); - if (redSize <= 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - spdlog::trace("{}", strerror(errno)); - break; - } else { - spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; - } - + if (redSize == SOCKET_ERROR) { + spdlog::error("recv() [{}]", strerror(errno)); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; + } + else if (redSize == 0) { + spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; } totalRedSize += redSize; } @@ -141,25 +141,20 @@ public: spdlog::trace("Writing on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); - int packetSize = iocpData->transferredbytes; + int packetSize = iocpData->wsabuf.len; int totalSentSize = 0; int sentSize = 0; spdlog::trace("Sending to: [{}]", (std::string)iocpData->socket->remoteAddr); while (totalSentSize < packetSize) { - sentSize = iocpData->socket->send(iocpData->buf, packetSize - totalSentSize, 0); + sentSize = iocpData->socket->send(iocpData->buf + totalSentSize, packetSize - totalSentSize, 0); - if (sentSize <= 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - spdlog::trace("{}", strerror(errno)); - break; - } else { - spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; - } + if (sentSize == SOCKET_ERROR) { + spdlog::error("send() [{}]", strerror(errno)); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; } totalSentSize += sentSize; } @@ -184,11 +179,16 @@ public: std::function task(callback); threadPool->enqueueJob(socketWriter, current_event, epollfd, task); } + if (--nready <= 0) + break; } threadPool->enqueueJob(iocpWatcher, epollfd, callback); }; #endif + IOCP(); + ~IOCP(); + template void init(ThreadPool* __IOCPThread, _Callable&& callback) { IOCPThread_ = __IOCPThread; @@ -215,24 +215,29 @@ public: __IOCPThread->enqueueJob(iocpWather, completionPort_, task); } #elif __linux__ + __IOCPThread->respawnWorker(__IOCPThread->threadCount + 1); spdlog::info("Spawning 1 Epoll Waiter..."); __IOCPThread->enqueueJob(iocpWatcher, epollfd_, boundFunc); #endif } - void registerSocket(Chattr::IOCPPASSINDATA* data); + void destruct(); - int recv(Chattr::IOCPPASSINDATA* data, int bufferCount); - int send(Chattr::IOCPPASSINDATA* data, int bufferCount, int __flags); + void registerSocket(IOCPPASSINDATA* data); + + int recv(IOCPPASSINDATA* data, int bufferCount); + int send(IOCPPASSINDATA* data, int bufferCount, int __flags); private: - struct Chattr::WSAManager wsaManager; + struct WSAManager wsaManager; ThreadPool* IOCPThread_; #ifdef _WIN32 HANDLE completionPort_ = INVALID_HANDLE_VALUE; #elif __linux__ int epollfd_ = -1; + std::unordered_map, std::mutex> writeMutex; + std::unordered_map, std::queue> writeBuffer; #endif };