From 1703974310121ffca8b5194ff6008c0d386ef343 Mon Sep 17 00:00:00 2001 From: HappyTanuki Date: Tue, 29 Apr 2025 21:49:58 +0900 Subject: [PATCH] =?UTF-8?q?=EC=9E=84=EC=8B=9C=EC=A0=80=EC=9E=A5(=EB=B9=8C?= =?UTF-8?q?=EB=93=9C=20=EC=95=88=EB=90=A8)=20=EC=A1=B8=EB=A0=A4=EC=84=9C?= =?UTF-8?q?=20=EC=9E=90=EB=9F=AC=EA=B0=88=EA=B1=B0=EC=95=BC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Client/src/client.cpp | 1 - Server/src/server.cpp | 10 ++++++-- impl/Session/Session.cpp | 26 ++++++++++++++++++++ impl/Utils/Thread.cpp | 23 ++++++++++++++---- impl/Utils/ThreadPool.cpp | 47 ++++++++++++++++++++++++++++++++++++ include/Session/Session.hpp | 2 ++ include/Utils/Thread.hpp | 33 ++++++++++++++++++------- include/Utils/ThreadPool.hpp | 42 ++++++++++++++++++++++++++++++++ 8 files changed, 167 insertions(+), 17 deletions(-) create mode 100644 impl/Session/Session.cpp create mode 100644 impl/Utils/ThreadPool.cpp create mode 100644 include/Utils/ThreadPool.hpp diff --git a/Client/src/client.cpp b/Client/src/client.cpp index 86fb07e..586b3ab 100644 --- a/Client/src/client.cpp +++ b/Client/src/client.cpp @@ -12,5 +12,4 @@ int main() { sock.connect(serveraddr); spdlog::info("Connection established from {}, {}", sock, (std::string)serveraddr); sock.send("Hello, World!", 14, 0); - sleep(100); } \ No newline at end of file diff --git a/Server/src/server.cpp b/Server/src/server.cpp index 4755c8e..19afe0f 100644 --- a/Server/src/server.cpp +++ b/Server/src/server.cpp @@ -3,6 +3,7 @@ #include "Socket/WSAManager.hpp" #include "Utils/ConfigManager.hpp" #include "Utils/Thread.hpp" +#include "Utils/ThreadPool.hpp" #include "Utils/StringTokenizer.hpp" #include "Session/Session.hpp" @@ -39,12 +40,17 @@ int main() { #endif spdlog::info("PID : {}", pid); + + Chattr::ThreadPool threadPool(3); + while (true) { spdlog::info("Waiting for connection..."); sock.accept(clientSock, clientAddr); - Chattr::Thread thread_(_TCPClient, std::move(clientSock), clientAddr); - thread_.detach(); + threadPool.enqueueJob(_TCPClient, std::move(clientSock), clientAddr); + + //Chattr::Thread thread_(_TCPClient, std::move(clientSock), clientAddr); + //thread_.detach(); } } diff --git a/impl/Session/Session.cpp b/impl/Session/Session.cpp new file mode 100644 index 0000000..3b0ee2b --- /dev/null +++ b/impl/Session/Session.cpp @@ -0,0 +1,26 @@ +#include "Session/Session.hpp" + +namespace Chattr { + +Session::Session() { +} + +Session::~Session() { +} + +bool Session::init() { + return false; +} + +void Session::destruct() { +} + +int Session::send() { + return -1; +} + +int Session::recv() { + return -1; +} + +} \ No newline at end of file diff --git a/impl/Utils/Thread.cpp b/impl/Utils/Thread.cpp index 68ed80a..147753e 100644 --- a/impl/Utils/Thread.cpp +++ b/impl/Utils/Thread.cpp @@ -1,17 +1,30 @@ #include "Utils/Thread.hpp" namespace Chattr { -Thread::~Thread() { - if (!detached) - join(); + +Thread::Thread(Thread&& other) noexcept { + other.detach(); } -void Thread::join() { +Thread& Thread::operator=(Thread&& other) noexcept { + other.detach(); + return *this; +} + +Thread::~Thread() { + if (!detached) { + spdlog::critical("There is not joined thread"); + std::exit(EXIT_FAILURE); + } +} + +void* Thread::join() { #ifdef _WIN32 WaitForSingleObject(handle_, INFINITE); #elif __linux__ - pthread_join(handle_, NULL); + pthread_join(handle_, returnValue); #endif + return returnValue; } void Thread::detach() { diff --git a/impl/Utils/ThreadPool.cpp b/impl/Utils/ThreadPool.cpp new file mode 100644 index 0000000..a8c7381 --- /dev/null +++ b/impl/Utils/ThreadPool.cpp @@ -0,0 +1,47 @@ +#include "Utils/ThreadPool.hpp" +#include "precomp.hpp" + +namespace Chattr { + +ThreadPool::ThreadPool(std::uint32_t numThreads) { + workers_.reserve(numThreads); + + while (numThreads--) + workers_.push_back([this]() -> void* { return this->Worker(); }); +} + +ThreadPool::~ThreadPool() { + terminate_ = true; + jobQueueCV_.notify_all(); + + for (auto& t : workers_) + t.join(); +} + +void* ThreadPool::Worker() { +#ifdef _WIN32 + DWORD pid = GetCurrentProcessId(); +#elif __linux__ + pid_t pid = getpid(); +#endif + spdlog::info("ThreadPool Worker : {}", pid); + while (!terminate_) { + std::unique_lock lock(jobQueueMutex); + spdlog::info("ThreadPool Worker : {} Waiting for a job", pid); + jobQueueCV_.wait(lock, [this]() { return !this->jobs_.empty() || terminate_; }); + if (this->jobs_.empty()) + return nullptr; + + auto jobPair = std::move(jobs_.front()); + jobs_.pop(); + lock.unlock(); + + spdlog::info("ThreadPool Worker : {} Executing a job", pid); + auto job = jobPair.first(); + jobPair.second() = (*job)(); + } + + return nullptr; +} + +} \ No newline at end of file diff --git a/include/Session/Session.hpp b/include/Session/Session.hpp index 56ccbc2..d55747d 100644 --- a/include/Session/Session.hpp +++ b/include/Session/Session.hpp @@ -1,5 +1,6 @@ #pragma once #include "Socket/TCPSocket.hpp" +#include "Utils/Snowflake.hpp" #include namespace Chattr { @@ -17,5 +18,6 @@ public: private: struct std::vector tcpSock_; struct std::vector udpSock_; + struct Snowflake sessId_; }; } \ No newline at end of file diff --git a/include/Utils/Thread.hpp b/include/Utils/Thread.hpp index a54c46c..26de644 100644 --- a/include/Utils/Thread.hpp +++ b/include/Utils/Thread.hpp @@ -12,39 +12,53 @@ namespace Chattr { +template +concept ReturnsVoidPtr = std::is_same_v< + std::invoke_result_t, + void* +>; + class Thread { public: #ifdef _WIN32 static unsigned __stdcall thread_func(LPVOID param) { - auto task(static_cast*>(param)); + // auto task = static_cast*>(param); + std::unique_ptr> task(static_cast*>(param)); (*task)(); - delete task; return 0; } #elif __linux__ static void* thread_func(void *param) { - auto task(static_cast*>(param)); + auto task(static_cast*>(param)); (*task)(); delete task; return 0; } #endif + Thread(Thread&&) noexcept; + Thread& operator=(Thread&&) noexcept; + + Thread(const Thread&) = delete; + Thread& operator=(Thread&) = delete; + template - requires (!std::is_same_v, Thread>) //복사 생성하면 안 되므로 + requires ReturnsVoidPtr<_Callable, _Args...> && (!std::is_same_v, Thread>) Thread(_Callable&& __f, _Args&&... __args) { - auto boundFunc = [__f = std::move(__f), ... __args = std::move(__args)]() mutable { - __f(std::move(__args)...); + auto boundFunc = [this, __f = std::move(__f), ... __args = std::move(__args)]() mutable -> void* { + void* ret = __f(std::move(__args)...); + returnValue = ret; + return ret; }; - auto funcPtr = new std::packaged_task(std::move(boundFunc)); + std::function funcPtr = std::move(boundFunc); #ifdef _WIN32 - handle_ = (HANDLE)_beginthreadex(nullptr, 0, thread_func, funcPtr, 0, nullptr); + handle_ = (HANDLE)_beginthreadex(nullptr, 0, thread_func, new std::function(std::move(funcPtr)), 0, nullptr); #elif __linux__ pthread_create(&handle_, NULL, thread_func, funcPtr); #endif } ~Thread(); - void join(); + void* join(); void detach(); private: @@ -53,6 +67,7 @@ private: #elif __linux__ pthread_t handle_; #endif + void* returnValue = nullptr; bool detached = false; }; diff --git a/include/Utils/ThreadPool.hpp b/include/Utils/ThreadPool.hpp new file mode 100644 index 0000000..8c4951f --- /dev/null +++ b/include/Utils/ThreadPool.hpp @@ -0,0 +1,42 @@ +#pragma once +#include "Thread.hpp" +#include +#include +#include +#include +#include +#include + +namespace Chattr { + +class ThreadPool { +public: + ThreadPool(std::uint32_t numThreads); + ~ThreadPool(); + + template + requires ReturnsVoidPtr<_Callable, _Args...> && (!std::is_same_v, Thread>) + int enqueueJob(_Callable&& __job, void* retVal, _Args&&... __args) { + if (terminate_) { + spdlog::error("Cannot run jobs on threads that terminating..."); + return -1; + } + + std::lock_guard lock(jobQueueMutex); + auto boundFunc = std::bind(std::forward<_Callable>(__job), std::forward<_Args>(__args)...); + std::function job(std::move(boundFunc)); + jobs_.push(job, retVal); + jobQueueCV_.notify_one(); + } + +private: + void* Worker(); + + std::condition_variable jobQueueCV_; + std::mutex jobQueueMutex; + std::queue, void*>> jobs_; + std::vector workers_; + bool terminate_ = false; +}; + +} \ No newline at end of file