From 181d30ff282cc424589ba49daa08d91aebadffe8 Mon Sep 17 00:00:00 2001 From: HappyTanuki Date: Fri, 2 May 2025 19:30:37 +0900 Subject: [PATCH] =?UTF-8?q?=EC=9D=BC=EB=8B=A8=20iocp=EB=8A=94=20=EC=99=84?= =?UTF-8?q?=EC=84=B1=ED=95=9C=20=EB=93=AF=20=EC=9C=88=EB=8F=84=EC=9A=B0?= =?UTF-8?q?=EC=97=90=EC=84=9C=EB=A7=8C=20=EB=90=98=EB=A9=B4=20=EB=90=A0=20?= =?UTF-8?q?=EB=93=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/settings.json | 4 +- CMakeLists.txt | 1 + Client/src/client.cpp | 4 +- Server/src/server.cpp | 32 ++++++------ impl/Socket/IOCP.cpp | 23 ++++++-- impl/Utils/Thread.cpp | 4 +- impl/Utils/ThreadPool.cpp | 8 ++- include/Socket/IOCP.hpp | 107 ++++++++++++++++++++++++++++++-------- include/Utils/Thread.hpp | 15 ++++-- include/precomp.hpp | 2 + 10 files changed, 147 insertions(+), 53 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index a988a14..26180ef 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -79,6 +79,8 @@ "set": "cpp", "util": "cpp", "expected": "cpp", - "complex": "cpp" + "complex": "cpp", + "__config": "cpp", + "ranges": "cpp" } } \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt index 7867ffc..3a5bd9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,7 @@ cmake_minimum_required(VERSION 3.5) set(PROJECT_NAME "Chattring") +set(CMAKE_BUILD_TYPE Debug) if(NOT CMAKE_BUILD_TYPE) set(CMAKE_BUILD_TYPE Release CACHE STRING "Choose the type of build." FORCE) endif() diff --git a/Client/src/client.cpp b/Client/src/client.cpp index 147edce..261b103 100644 --- a/Client/src/client.cpp +++ b/Client/src/client.cpp @@ -8,7 +8,7 @@ int main() { Chattr::TCPSocket sock; sock.init(AF_INET6); - Chattr::Address serveraddr(AF_INET6, "fd8a:5f:3adb:774d:dfae:983f:2e7a:ffba", 9010); + Chattr::Address serveraddr(AF_INET6, "::1", 9011); sock.connect(serveraddr); spdlog::info("Connection established from {}", (std::string)serveraddr); @@ -27,5 +27,5 @@ int main() { sock.recv(&packet.serialized, 1500, 0); packet.convToH(); - // sleep(1000000); + sleep(1000000); } \ No newline at end of file diff --git a/Server/src/server.cpp b/Server/src/server.cpp index d359276..2fcf01a 100644 --- a/Server/src/server.cpp +++ b/Server/src/server.cpp @@ -23,14 +23,11 @@ int main() { Chattr::ThreadPool threadPool(0); Chattr::IOCP iocp; iocp.init(&threadPool, _IOCPClient); - // struct Chattr::WSAManager wsaManager; Chattr::TCPSocket sock; struct Chattr::Address serveraddr; Chattr::TCPSocket clientSock; struct Chattr::Address clientAddr; - bool enable = true; - sock.setsockopt(SOL_SOCKET, SO_KEEPALIVE, (const char*)&enable, sizeof(enable)); if (config.ipVersion == 4) { sock.init(AF_INET); serveraddr.set(AF_INET, INADDR_ANY, config.listenPort); @@ -64,7 +61,7 @@ int main() { ptr->wsabuf.len = 1500; ptr->IOCPInstance = &iocp; - iocp.registerSocket(ptr->socket.sock); + iocp.registerSocket(ptr); int returnData = iocp.recv(ptr); } @@ -76,17 +73,8 @@ void _IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDATA* data) { int totalRedSize = 0; int redSize = 0; - // spdlog::info("Receving from: [{}]", (std::string)sock.remoteAddr); - - //while (totalRedSize < packetSize) { - // redSize = data->socket.recv(&pack.serialized, 1500 - totalRedSize, 0); - // - // if (redSize <= 0) { - // // spdlog::info("Client disconnected. [{}]", (std::string)sock.remoteAddr); - // return; - // } - // totalRedSize += redSize; - //} + if (data->wsabuf.len == data->sendbytes) + return; memcpy(pack.serialized, data->wsabuf.buf, data->wsabuf.len); @@ -94,7 +82,6 @@ void _IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDATA* data) { bool packetError = false; Chattr::DataPostPacket dataPostPacket; - Chattr::ResponsePacket packet; std::queue responsePackets; @@ -166,7 +153,18 @@ void _IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDATA* data) { break; } - int returnData = data->IOCPInstance->recv(data); + packet.__data.packetType = Chattr::PacketType::PACKET_RESPONSE; + packet.__data.requestType = Chattr::RequestType::DATA; + packet.__data.dataType = Chattr::DataType::TEXT; + packet.__data.packetLength = sizeof(Chattr::ResponseStatusCode); + packet.__data.responseStatusCode = Chattr::ResponseStatusCode::BAD_REQUEST; + packet.convToN(); + + memcpy(data->wsabuf.buf, packet.serialized, data->wsabuf.len); + + int returnData = data->IOCPInstance->send(data, 0); + + // int returnData = data->IOCPInstance->recv(data); } void _TCPSendClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, std::queue packets) { diff --git a/impl/Socket/IOCP.cpp b/impl/Socket/IOCP.cpp index 708bb1a..4460fce 100644 --- a/impl/Socket/IOCP.cpp +++ b/impl/Socket/IOCP.cpp @@ -5,13 +5,22 @@ namespace Chattr { -void IOCP::registerSocket(SOCKET sock) { +void IOCP::registerSocket(Chattr::IOCPPASSINDATA* data) { #ifdef _WIN32 - HANDLE returnData = ::CreateIoCompletionPort((HANDLE)sock, completionPort_, sock, 0); + HANDLE returnData = ::CreateIoCompletionPort((HANDLE)data->socket.sock, completionPort_, data->socket.sock, 0); if (returnData == 0) completionPort_ = returnData; #elif __linux__ + // int flags = ::fcntl(data->socket.sock, F_GETFL); + // flags |= O_NONBLOCK; + // fcntl(data->socket.sock, F_SETFL, flags); + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLONESHOT; + ev.data.ptr = data; + int rc = epoll_ctl(epollfd_, EPOLL_CTL_ADD, data->socket.sock, &ev); + if (rc < 0) + log::critical("epoll_ctl()"); #endif } @@ -20,7 +29,10 @@ int IOCP::recv(Chattr::IOCPPASSINDATA* data) { DWORD recvbytes = 0, flags = 0; return ::WSARecv(data->socket.sock, &data->wsabuf, 1, &recvbytes, &flags, &data->overlapped, NULL); #elif __linux__ - return -1; + struct epoll_event ev; + ev.events = EPOLLIN | EPOLLONESHOT; + ev.data.ptr = data; + return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket.sock, &ev); #endif } @@ -29,7 +41,10 @@ int IOCP::send(Chattr::IOCPPASSINDATA* data, int __flags) { DWORD sendbytes = 0; return ::WSASend(data->socket.sock, &data->wsabuf, 1, &sendbytes, __flags, &data->overlapped, NULL); #elif __linux__ - return -1; + struct epoll_event ev; + ev.events = EPOLLOUT | EPOLLONESHOT; + ev.data.ptr = data; + return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket.sock, &ev); #endif } diff --git a/impl/Utils/Thread.cpp b/impl/Utils/Thread.cpp index 7fbb60b..4a69022 100644 --- a/impl/Utils/Thread.cpp +++ b/impl/Utils/Thread.cpp @@ -4,10 +4,12 @@ namespace Chattr { Thread::Thread(Thread&& other) noexcept { other.detach(); + memcpy(this, &other, sizeof(Thread)); } Thread& Thread::operator=(Thread&& other) noexcept { other.detach(); + memcpy(this, &other, sizeof(Thread)); return *this; } @@ -16,8 +18,6 @@ Thread::~Thread() { spdlog::critical("There is not joined thread"); std::exit(EXIT_FAILURE); } - if (returnValuePtr != nullptr) - delete returnValuePtr; } void Thread::detach() { diff --git a/impl/Utils/ThreadPool.cpp b/impl/Utils/ThreadPool.cpp index 4ce4ef2..ef51c0c 100644 --- a/impl/Utils/ThreadPool.cpp +++ b/impl/Utils/ThreadPool.cpp @@ -63,11 +63,16 @@ void* ThreadPool::Worker() { #elif __linux__ pthread_t pid = pthread_self(); #endif + spdlog::debug("ThreadPool Worker : {} up", pid); while (!terminate_) { std::unique_lock lock(jobQueueMutex); jobQueueCV_.wait(lock, [this]() { return !this->jobs_.empty() || terminate_; }); - if (this->jobs_.empty()) + if (this->jobs_.empty() || terminate_) { + jobs_ = std::queue>(); break; + } + if (this->jobs_.empty()) + continue; auto job = std::move(jobs_.front()); jobs_.pop(); @@ -76,6 +81,7 @@ void* ThreadPool::Worker() { spdlog::debug("ThreadPool Worker : {} Executing a job", pid); job(); } + spdlog::debug("ThreadPool Worker : {} down", pid); return nullptr; } diff --git a/include/Socket/IOCP.hpp b/include/Socket/IOCP.hpp index a48e260..0ae552d 100644 --- a/include/Socket/IOCP.hpp +++ b/include/Socket/IOCP.hpp @@ -4,6 +4,7 @@ #include "Socket/TCPSocket.hpp" #include "Socket/Log.hpp" #include +#include "precomp.hpp" #ifndef _WIN32 @@ -52,21 +53,81 @@ public: threadPool->enqueueJob(iocpWather, completionPort_, callback); }; #elif __linux__ - static void iocpWather(ThreadPool* threadPool, std::function callback) { - // DWORD tid = GetCurrentThreadId(); - // spdlog::debug("Waiting IO to complete on TID: {}.", tid); - // IOCPPASSINDATA* data; - // SOCKET sock; - // DWORD cbTransfrred; - // int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, (PULONG_PTR)&sock, (LPOVERLAPPED*)&data, INFINITE); - // if (retVal == 0 || cbTransfrred == 0) { - // spdlog::info("Client disconnected. [{}]", (std::string)(data->socket.remoteAddr)); - // delete data; - // threadPool->enqueueJob(iocpWather, completionPort_, callback); - // return; - // } - // threadPool->enqueueJob(callback, data); - // threadPool->enqueueJob(iocpWather, completionPort_, callback); + static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { + IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr; + + pthread_t tid = pthread_self(); + + if (iocpData == nullptr) { + spdlog::error("invalid call on {}", tid); + return; + } + + spdlog::info("reading on tid: {} [{}]", tid, (std::string)iocpData->socket.remoteAddr); + + int redSize = 0; + int packetSize = iocpData->wsabuf.len; + int totalRedSize = 0; + + while (totalRedSize < packetSize) { + redSize = iocpData->socket.recv(iocpData->buf, packetSize - totalRedSize, 0); + + if (redSize <= 0) { + spdlog::info("Client disconnected. [{}]", (std::string)iocpData->socket.remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket.sock, NULL); + delete iocpData; + return; + } + totalRedSize += redSize; + } + iocpData->sendbytes = packetSize - totalRedSize; + iocpData->recvbytes = totalRedSize; + threadPool->enqueueJob(callback, iocpData); + }; + static void socketWriter(ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { + IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr; + + int packetSize = iocpData->wsabuf.len; + int totalSentSize = 0; + int sentSize = 0; + + spdlog::info("Sending to: [{}]", (std::string)iocpData->socket.remoteAddr); + + while (totalSentSize < packetSize) { + sentSize = iocpData->socket.send(iocpData->buf, packetSize - totalSentSize, 0); + + if (sentSize <= 0) { + spdlog::info("Client disconnected. [{}]", (std::string)iocpData->socket.remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket.sock, NULL); + delete iocpData; + return; + } + totalSentSize += sentSize; + } + iocpData->recvbytes = packetSize - totalSentSize; + iocpData->sendbytes = totalSentSize; + threadPool->enqueueJob(callback, iocpData); + }; + static void iocpWatcher(ThreadPool* threadPool, int epollfd, std::function callback) { + struct epoll_event events[FD_SETSIZE]; + pthread_t tid = pthread_self(); + + spdlog::debug("epoll waiting on {}", tid); + int nready = ::epoll_wait(epollfd, events, FD_SETSIZE, -1); + + for (int i=0; i task(callback); + threadPool->enqueueJob(socketReader, current_event, epollfd, task); + } + else if (current_event.events & EPOLLOUT) { + std::function task(callback); + threadPool->enqueueJob(socketWriter, current_event, epollfd, task); + } + } + threadPool->enqueueJob(iocpWatcher, epollfd, callback); }; #endif @@ -78,26 +139,30 @@ public: if (completionPort_ == NULL) log::critical("CreateIoCompletionPort()"); #elif __linux__ - + epollfd_ = ::epoll_create(1); #endif auto boundFunc = [callback = std::move(callback)](ThreadPool* __IOCPThread, IOCPPASSINDATA* data) mutable { callback(__IOCPThread, data); - }; + }; +#ifdef _WIN32 int tCount = __IOCPThread->threadCount; spdlog::info("Resizing threadpool size to: {}", tCount * 2); __IOCPThread->respawnWorker(tCount * 2); - spdlog::info("Set IOCP Worker count to: {}", tCount); for (int i = 0; i < tCount; i++) { std::function task(boundFunc); - __IOCPThread->enqueueJob(iocpWather, task); + __IOCPThread->enqueueJob(iocpWather, completionPort_, task); } +#elif __linux__ + spdlog::info("Spawning 1 Epoll Waiter..."); + __IOCPThread->enqueueJob(iocpWatcher, epollfd_, boundFunc); +#endif } - void registerSocket(SOCKET sock); + void registerSocket(Chattr::IOCPPASSINDATA* data); int recv(Chattr::IOCPPASSINDATA* data); int send(Chattr::IOCPPASSINDATA* data, int __flags); @@ -109,7 +174,7 @@ private: #ifdef _WIN32 HANDLE completionPort_ = INVALID_HANDLE_VALUE; #elif __linux__ - int epollfd = -1; + int epollfd_ = -1; #endif }; diff --git a/include/Utils/Thread.hpp b/include/Utils/Thread.hpp index 94e5ae9..d76ee94 100644 --- a/include/Utils/Thread.hpp +++ b/include/Utils/Thread.hpp @@ -9,6 +9,9 @@ #endif #include #include +#include + +#include "Socket/Log.hpp" namespace Chattr { @@ -40,7 +43,7 @@ public: (!std::is_void_v>) Thread(_Callable&& __f, _Args&&... __args) { auto boundFunc = [this, __f, ... __args = std::move(__args)]() mutable { - returnValuePtr = new std::invoke_result_t<_Callable, _Args...>(__f(std::move(__args)...)); + returnValuePtr = std::make_shared>(__f(std::move(__args)...)); }; std::packaged_task* funcPtr = new std::packaged_task(std::move(boundFunc)); #ifdef _WIN32 @@ -60,8 +63,10 @@ public: #ifdef _WIN32 handle_ = (HANDLE)_beginthreadex(nullptr, 0, thread_func, funcPtr, 0, nullptr); #elif __linux__ - pthread_create(&handle_, NULL, thread_func, funcPtr); + int rc = pthread_create(&handle_, NULL, thread_func, funcPtr); #endif + if (handle_ <= 0 || rc != 0) + log::critical("pthread_create()"); } ~Thread(); @@ -69,7 +74,7 @@ public: #ifdef _WIN32 WaitForSingleObject(handle_, INFINITE); #elif __linux__ - pthread_join(handle_, &returnValuePtr); + pthread_join(handle_, nullptr); #endif } template @@ -78,7 +83,7 @@ public: #ifdef _WIN32 WaitForSingleObject(handle_, INFINITE); #elif __linux__ - pthread_join(handle_, &returnValuePtr); + pthread_join(handle_, nullptr); #endif return *static_cast<_RetType *>(returnValuePtr); } @@ -90,7 +95,7 @@ private: #elif __linux__ pthread_t handle_; #endif - void* returnValuePtr = nullptr; + std::shared_ptr returnValuePtr; bool detached = false; }; diff --git a/include/precomp.hpp b/include/precomp.hpp index 09deefa..6259efb 100644 --- a/include/precomp.hpp +++ b/include/precomp.hpp @@ -10,7 +10,9 @@ #elif __linux__ #include #include +#include #include +#include #include #include #define SOCKET int