diff --git a/.vscode/settings.json b/.vscode/settings.json index c986040..745cf78 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -85,6 +85,7 @@ "valarray": "cpp", "__tree": "cpp", "map": "cpp", - "iostream": "cpp" + "iostream": "cpp", + "any": "cpp" } } \ No newline at end of file diff --git a/Server/src/ServerManager/ServerManager.cpp b/Server/src/ServerManager/ServerManager.cpp index e062122..02d2e44 100644 --- a/Server/src/ServerManager/ServerManager.cpp +++ b/Server/src/ServerManager/ServerManager.cpp @@ -8,6 +8,8 @@ void ServerManager::_IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDA Chattr::Packet pack; int packetSize = data->transferredbytes; + memcpy(pack.serialized, data->wsabuf.buf, data->wsabuf.len); + if (data->event == IOCPEVENT::WRITE && data->transferredbytes >= data->wsabuf.len) { data->event = IOCPEVENT::READ; data->wsabuf.len = 1500; @@ -15,8 +17,6 @@ void ServerManager::_IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDA return; } - memcpy(pack.serialized, data->wsabuf.buf, data->wsabuf.len); - std::uint16_t packetLength = ::ntohs(pack.__data.packetLength); if (data->event == IOCPEVENT::READ && data->transferredbytes < packetLength + 8) { @@ -226,12 +226,11 @@ void ServerManager::processRoomListRequest(RoomListRequestPacket roomListRequest int packetLength = roomListResponsePacket.__data.packetLength; Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; + ::memcpy(ptr, data, sizeof(IOCPPASSINDATA)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); - ptr->socket = data->socket; ptr->recvbytes = ptr->sendbytes = 0; ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.len = packetLength + 8; - ptr->IOCPInstance = data->IOCPInstance; roomListResponsePacket.convToN(); memcpy(ptr->wsabuf.buf, roomListResponsePacket.serialized, packetLength + 8); @@ -303,12 +302,11 @@ void ServerManager::processUsersListRequestPacket(UsersListRequestPacket usersLi int packetLength = usersListResponsePacket.__data.packetLength; Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; + ::memcpy(ptr, data, sizeof(IOCPPASSINDATA)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); - ptr->socket = data->socket; ptr->recvbytes = ptr->sendbytes = 0; ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.len = packetLength + 8; - ptr->IOCPInstance = data->IOCPInstance; usersListResponsePacket.convToN(); memcpy(ptr->wsabuf.buf, usersListResponsePacket.serialized, packetLength + 8); @@ -351,12 +349,12 @@ void ServerManager::processDataPostPacket(DataPostPacket dataPostPacket, IOCPPAS for (auto dest : destinationSockets) { Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; + ::memcpy(ptr, data, sizeof(IOCPPASSINDATA)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ptr->socket = dest; ptr->recvbytes = ptr->sendbytes = 0; ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.len = packetLength + 6; - ptr->IOCPInstance = data->IOCPInstance; dataPostPacket.convToN(); memcpy(ptr->wsabuf.buf, dataPostPacket.serialized, packetLength + 6); @@ -399,12 +397,12 @@ void ServerManager::processContinuePacket(ContinuePacket continuePacket, IOCPPAS for (auto dest : destinationSockets) { Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; + ::memcpy(ptr, data, sizeof(IOCPPASSINDATA)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ptr->socket = dest; ptr->recvbytes = ptr->sendbytes = 0; ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.len = packetLength + 6; - ptr->IOCPInstance = data->IOCPInstance; continuePacket.convToN(); memcpy(ptr->wsabuf.buf, continuePacket.serialized, packetLength + 6); @@ -480,8 +478,6 @@ void ServerManager::run() { while (true) { spdlog::info("Waiting for connection..."); listenSock_.accept(clientSock_, clientAddr_); - bool enable = true; - clientSock_.setsockopt(SOL_SOCKET, SO_KEEPALIVE, (const char*)&enable, sizeof(enable)); Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ptr->socket = std::make_shared(std::move(clientSock_)); diff --git a/impl/Socket/IOCP.cpp b/impl/Socket/IOCP.cpp index f45e38c..a9a4452 100644 --- a/impl/Socket/IOCP.cpp +++ b/impl/Socket/IOCP.cpp @@ -20,6 +20,7 @@ void IOCP::destruct() { } void IOCP::registerSocket(IOCPPASSINDATA* data) { + data->event = IOCPEVENT::READ; #ifdef _WIN32 HANDLE returnData = ::CreateIoCompletionPort((HANDLE)data->socket->sock, completionPort_, data->socket->sock, 0); if (returnData == 0) @@ -30,7 +31,8 @@ void IOCP::registerSocket(IOCPPASSINDATA* data) { // fcntl(data->socket->sock, F_SETFL, flags); struct epoll_event ev; - ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; + ev.events = EPOLLIN | EPOLLONESHOT; + data->sendQueue = std::make_shared>(); ev.data.ptr = data; int rc = epoll_ctl(epollfd_, EPOLL_CTL_ADD, data->socket->sock, &ev); if (rc < 0) @@ -45,7 +47,7 @@ int IOCP::recv(Chattr::IOCPPASSINDATA* data, int bufferCount) { return ::WSARecv(data->socket->sock, &data->wsabuf, bufferCount, &recvbytes, &flags, &data->overlapped, NULL); #elif __linux__ struct epoll_event ev; - ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; + ev.events = EPOLLIN | EPOLLONESHOT; ev.data.ptr = data; return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev); #endif @@ -58,11 +60,9 @@ int IOCP::send(Chattr::IOCPPASSINDATA* data, int bufferCount, int __flags, bool return ::WSASend(data->socket->sock, &data->wsabuf, bufferCount, &sendbytes, __flags, &data->overlapped, NULL); #elif __linux__ struct epoll_event ev; - if (client) - ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT; - else - ev.events = EPOLLOUT | EPOLLET | EPOLLONESHOT; + ev.events = EPOLLOUT | EPOLLONESHOT; ev.data.ptr = data; + data->sendQueue->push(data); return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev); #endif } diff --git a/include/Socket/IOCP.hpp b/include/Socket/IOCP.hpp index 124f855..b7b03ab 100644 --- a/include/Socket/IOCP.hpp +++ b/include/Socket/IOCP.hpp @@ -6,6 +6,7 @@ #include "Packet/Packet.hpp" #include #include +#include #include "precomp.hpp" @@ -42,6 +43,9 @@ struct IOCPPASSINDATA { std::uint32_t transferredbytes; WSABUF wsabuf; IOCP* IOCPInstance; +#ifdef __linux__ + std::shared_ptr> sendQueue; +#endif }; class IOCP { @@ -66,100 +70,110 @@ public: }; #elif __linux__ static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { - IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr; - pthread_t tid = pthread_self(); - if (iocpData == nullptr) { + if (event.data.ptr == nullptr) { spdlog::error("invalid call on {}", tid); return; } + + IOCPPASSINDATA* rootIocpData = (IOCPPASSINDATA*)event.data.ptr; - std::lock_guard lock(iocpData->socket->readMutex); + std::lock_guard lock(rootIocpData->socket->readMutex); + char peekBuffer[1]; + while (rootIocpData->socket->recv(peekBuffer, 1, MSG_PEEK)) { + Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; + ::memcpy(ptr, rootIocpData, sizeof(IOCPPASSINDATA)); + ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); + ptr->recvbytes = ptr->sendbytes = 0; + ptr->wsabuf.buf = ptr->buf; + ptr->wsabuf.len = 1500; - spdlog::trace("reading on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); + int redSize = 0; + int headerSize = 8; + int totalRedSize = 0; - int redSize = 0; - int headerSize = 8; - int totalRedSize = 0; - - while (totalRedSize < headerSize) { - redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, headerSize - totalRedSize, 0); - - if (redSize == SOCKET_ERROR) { - spdlog::error("recv() [{}]", strerror(errno)); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; + while (totalRedSize < headerSize) { + redSize = ptr->socket->recv(ptr->buf + totalRedSize, headerSize - totalRedSize, 0); + + if (redSize == SOCKET_ERROR) { + spdlog::error("recv() [{}]", strerror(errno)); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); + delete ptr; + return; + } + else if (redSize == 0) { + spdlog::debug("Client disconnected. [{}]", (std::string)ptr->socket->remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); + delete ptr; + return; + } + totalRedSize += redSize; } - else if (redSize == 0) { - spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; + + Packet packet; + ::memcpy(packet.serialized, ptr->buf, headerSize); + + redSize = 0; + int dataLength = ntohs(packet.__data.packetLength); + + while (totalRedSize < dataLength + headerSize) { + redSize = ptr->socket->recv(ptr->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0); + + if (redSize == SOCKET_ERROR) { + spdlog::error("recv() [{}]", strerror(errno)); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); + delete ptr; + return; + } + else if (redSize == 0) { + spdlog::debug("Client disconnected. [{}]", (std::string)ptr->socket->remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL); + delete ptr; + return; + } + totalRedSize += redSize; } - totalRedSize += redSize; + ptr->transferredbytes = totalRedSize; + threadPool->enqueueJob(callback, ptr); } - - Packet packet; - ::memcpy(packet.serialized, iocpData->buf, headerSize); - - redSize = 0; - int dataLength = ntohs(packet.__data.packetLength); - - while (totalRedSize < dataLength + headerSize) { - redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0); - - if (redSize == SOCKET_ERROR) { - spdlog::error("recv() [{}]", strerror(errno)); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; - } - else if (redSize == 0) { - spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; - } - totalRedSize += redSize; - } - iocpData->transferredbytes = totalRedSize; - threadPool->enqueueJob(callback, iocpData); }; static void socketWriter(ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { - IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr; - pthread_t tid = pthread_self(); - if (iocpData == nullptr) { + if (event.data.ptr == nullptr) { spdlog::error("invalid call on {}", tid); return; } + + IOCPPASSINDATA* rootIocpData = (IOCPPASSINDATA*)event.data.ptr; - std::lock_guard lock(iocpData->socket->writeMutex); + std::lock_guard lock(rootIocpData->socket->writeMutex); + auto queue = rootIocpData->sendQueue; + while (!queue->empty()) { + IOCPPASSINDATA* data = queue->front(); + queue->pop(); - spdlog::trace("Writing on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); + int packetSize = data->wsabuf.len; + int totalSentSize = 0; + int sentSize = 0; - int packetSize = iocpData->wsabuf.len; - int totalSentSize = 0; - int sentSize = 0; + spdlog::trace("Sending to: [{}]", (std::string)data->socket->remoteAddr); - spdlog::trace("Sending to: [{}]", (std::string)iocpData->socket->remoteAddr); + while (totalSentSize < packetSize) { + sentSize = data->socket->send(data->buf + totalSentSize, packetSize - totalSentSize, 0); - while (totalSentSize < packetSize) { - sentSize = iocpData->socket->send(iocpData->buf + totalSentSize, packetSize - totalSentSize, 0); - - if (sentSize == SOCKET_ERROR) { - spdlog::error("send() [{}]", strerror(errno)); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); - delete iocpData; - return; + if (sentSize == SOCKET_ERROR) { + spdlog::error("send() [{}]", strerror(errno)); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, data->socket->sock, NULL); + delete data; + return; + } + totalSentSize += sentSize; } - totalSentSize += sentSize; + data->transferredbytes = totalSentSize; + threadPool->enqueueJob(callback, data); } - iocpData->transferredbytes = totalSentSize; - threadPool->enqueueJob(callback, iocpData); }; static void iocpWatcher(ThreadPool* threadPool, int epollfd, std::function callback) { struct epoll_event events[FD_SETSIZE];