From c0e0279e5c30556d05bafc7245c1573980684e22 Mon Sep 17 00:00:00 2001 From: HappyTanuki Date: Wed, 4 Jun 2025 02:33:00 +0900 Subject: [PATCH] =?UTF-8?q?todo:=20iocp=20recv=EC=95=88=EB=90=98=EB=8A=94?= =?UTF-8?q?=20=EB=AC=B8=EC=A0=9C=20=ED=95=B4=EA=B2=B0=ED=95=98=EA=B8=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- CMakeLists.txt | 2 +- Client/include/vulkan_engine/vulkan/engine.h | 7 +- Client/src/asteroid/main.cpp | 85 ++++++++++--- Client/src/vulkan_engine/vulkan/engine.cpp | 2 +- Server/CMakeLists.txt | 13 ++ Server/src/echoserver.cpp | 93 ++++++++++++++ impl/socket/address.cpp | 127 +++++++++++-------- impl/socket/iocp.cpp | 44 ++++--- impl/socket/tcp_socket.cpp | 20 ++- impl/utils/log.cpp | 2 +- impl/utils/thread_pool.cpp | 15 +-- include/socket/address.h | 9 +- include/socket/iocp.h | 88 ++++++++----- include/socket/wsa_manager.h | 9 -- include/utils/thread_pool.h | 4 +- 15 files changed, 363 insertions(+), 157 deletions(-) create mode 100644 Server/CMakeLists.txt create mode 100644 Server/src/echoserver.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index ad387d0..0c07ddc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,4 +27,4 @@ file(GLOB_RECURSE RootProjectSources CONFIGURE_DEPENDS ) add_subdirectory(Client) -# add_subdirectory(Server) \ No newline at end of file +add_subdirectory(Server) \ No newline at end of file diff --git a/Client/include/vulkan_engine/vulkan/engine.h b/Client/include/vulkan_engine/vulkan/engine.h index 73c635a..ce15f29 100644 --- a/Client/include/vulkan_engine/vulkan/engine.h +++ b/Client/include/vulkan_engine/vulkan/engine.h @@ -9,8 +9,9 @@ namespace veng { class Engine { public: - Engine(gsl::not_null vulkan_graphics) - : vulkan_graphics(vulkan_graphics) {} + Engine(gsl::not_null vulkan_graphics, + utils::ThreadPool* thread_pool) + : vulkan_graphics(vulkan_graphics), thread_pool_(thread_pool) {} void init(); @@ -39,7 +40,7 @@ class Engine { private: Loader asset_loader_; Physics physics_controller_; - utils::ThreadPool thread_pool_; + utils::ThreadPool* thread_pool_; glm::ivec2 window_size_ = {0, 0}; std::double_t last_frame_time_ = 0.0; diff --git a/Client/src/asteroid/main.cpp b/Client/src/asteroid/main.cpp index e4ca51a..ac4ecf9 100644 --- a/Client/src/asteroid/main.cpp +++ b/Client/src/asteroid/main.cpp @@ -2,36 +2,79 @@ #include "glfw/glfw_initialization.h" #include "glfw/glfw_monitor.h" #include "glfw/glfw_window.h" +#include "socket/iocp.h" +#include "socket/tcp_socket.h" +#include "socket/wsa_manager.h" +#include "utils/log.h" #include "vulkan_engine/vulkan/engine.h" #include "vulkan_engine/vulkan/graphics.h" -#include "socket/iocp.h" -std::int32_t main(std::int32_t argc, gsl::zstring* argv) { - Network::IOCP iocp; + std::int32_t main(std::int32_t argc, gsl::zstring* argv) { + Network::WSAManager wsamanager; + #if !defined(NDEBUG) + utils::setDefaultLogger(spdlog::level::level_enum::debug, "log.log", 1024, + 2); + #endif + utils::ThreadPool tp(0); + Network::IOCP iocp; + iocp.init(&tp, SessionProtocol::TCP); - const veng::GlfwInitialization _glfw; + Network::Address addr; + in6_addr in6addr; - veng::Window window("Vulkan Engine", {800, 600}); - window.TryMoveToMonitor(0); + addr.set(AF_INET6, "::1", 9010); - veng::Graphics graphics(&window); - veng::Engine engine(&graphics); + Network::TCPSocket sock; + sock.init(AF_INET6); + if (sock.connect(addr) == INVALID_SOCKET) { + spdlog::error("connect()"); + std::exit(EXIT_FAILURE); + } - engine.LoadModelAsset("assets/player.fbx", "player"); - engine.LoadModelAsset("assets/player_flame.fbx", "player_flame"); - engine.LoadModelAsset("assets/bullet.fbx", "bullet"); - engine.LoadModelAsset("assets/background.fbx", "background"); + Network::IOCPPASSINDATA* data = new Network::IOCPPASSINDATA(16 * 1024); + data->socket = std::make_shared(sock); + data->IOCPInstance = &iocp; + iocp.registerSocket(data); - engine.BeginPlay = BeginPlay; - engine.Tick = Tick; + std::vector send_data; + data->event = Network::IOCPEVENT::WRITE; + data->wsabuf.buf[0] = 'a'; + data->wsabuf.buf[1] = 'b'; + data->wsabuf.buf[2] = '\0'; + data->wsabuf.len = 3; + send_data.push_back(data); + iocp.send(sock.sock, &send_data); - engine.init(); + Sleep(10000); - while (!window.ShouldClose()) { - glfwPollEvents(); + Network::IOCPPASSINDATA* recv_data = new Network::IOCPPASSINDATA(16 * 1024); + recv_data->socket = std::make_shared(sock); + recv_data->IOCPInstance = &iocp; + iocp.recv(recv_data); - engine.Update(); - } + const veng::GlfwInitialization _glfw; - return EXIT_SUCCESS; -} + veng::Window window("Vulkan Engine", {800, 600}); + window.TryMoveToMonitor(0); + + veng::Graphics graphics(&window); + veng::Engine engine(&graphics, &tp); + + engine.LoadModelAsset("assets/player.fbx", "player"); + engine.LoadModelAsset("assets/player_flame.fbx", "player_flame"); + engine.LoadModelAsset("assets/bullet.fbx", "bullet"); + engine.LoadModelAsset("assets/background.fbx", "background"); + + engine.BeginPlay = BeginPlay; + engine.Tick = Tick; + + engine.init(); + + while (!window.ShouldClose()) { + glfwPollEvents(); + + engine.Update(); + } + + return EXIT_SUCCESS; + } diff --git a/Client/src/vulkan_engine/vulkan/engine.cpp b/Client/src/vulkan_engine/vulkan/engine.cpp index 6894698..162757b 100644 --- a/Client/src/vulkan_engine/vulkan/engine.cpp +++ b/Client/src/vulkan_engine/vulkan/engine.cpp @@ -168,7 +168,7 @@ void Engine::Update() { vulkan_graphics->RenderModel(it); } - physics_controller_.invokeOnColisionEvent(&thread_pool_, {models.data(), models.size()}); + physics_controller_.invokeOnColisionEvent(thread_pool_, {models.data(), models.size()}); vulkan_graphics->EndFrame(); } diff --git a/Server/CMakeLists.txt b/Server/CMakeLists.txt new file mode 100644 index 0000000..a1627a5 --- /dev/null +++ b/Server/CMakeLists.txt @@ -0,0 +1,13 @@ +cmake_minimum_required(VERSION 3.5) + +set(PROJECT_NAME "Server") + +project(${PROJECT_NAME}) + +add_executable(${PROJECT_NAME} + "${CMAKE_CURRENT_SOURCE_DIR}/src/echoserver.cpp" +) + +if(WIN32) + target_link_libraries(${PROJECT_NAME} PRIVATE ws2_32) +endif() \ No newline at end of file diff --git a/Server/src/echoserver.cpp b/Server/src/echoserver.cpp new file mode 100644 index 0000000..19d482e --- /dev/null +++ b/Server/src/echoserver.cpp @@ -0,0 +1,93 @@ +#pragma once + +#ifdef _WIN32 +#include +#include +#include +#include +#include +#define in_addr_t ULONG +#elif __linux__ +#include +#include +#include +#include +#include +#include +#include +#include +#define SOCKET int +#define INVALID_SOCKET -1 +#define SOCKET_ERROR -1 +#else +#error "이 플랫폼은 지원되지 않습니다." +#endif + +#include + +void err_quit(const char *msg) { + char *msgbuf = strerror(errno); + fprintf(stderr, "[%s] %s\n", msg, msgbuf); + exit(1); +} + +void err_display(const char *msg) { + char *msgbuf = strerror(errno); + fprintf(stderr, "[%s] %s\n", msg, msgbuf); +} + +#define LISTENIP "::" +#define LISTENPORT 9010 +#define BUFSIZE 100 + +int main(int argc, char *argv[]) { + WSADATA wsa; + if (WSAStartup(MAKEWORD(2, 2), &wsa) != 0) { + err_quit("WSAStartup()"); + return EXIT_FAILURE; + } + SOCKET listenSocket = socket(AF_INET6, SOCK_STREAM, 0); + if (listenSocket == INVALID_SOCKET) err_quit("socket()"); + + struct sockaddr_in6 serverAddr; + memset(&serverAddr, 0, sizeof(serverAddr)); + serverAddr.sin6_family = AF_INET6; + inet_pton(AF_INET6, LISTENIP, &serverAddr.sin6_addr); + serverAddr.sin6_port = htons(LISTENPORT); + + if (SOCKET_ERROR == + bind(listenSocket, (struct sockaddr *)&serverAddr, sizeof(serverAddr))) + err_quit("bind()"); + if (SOCKET_ERROR == listen(listenSocket, SOMAXCONN)) err_quit("listen()"); + + SOCKET client_sock; + struct sockaddr_in6 client_addr; + socklen_t addr_len; + + addr_len = sizeof(client_addr); + printf("Waiting for connection...\n"); + client_sock = + accept(listenSocket, (struct sockaddr *)&client_addr, &addr_len); + if (client_sock == INVALID_SOCKET) err_quit("accept()"); + printf("Connection established!\n"); + + char buf[BUFSIZE + 1]; + + while (2) { + int retVal = recv(client_sock, buf, BUFSIZE, 0); + if (retVal == SOCKET_ERROR) + err_display("recv()"); + else if (retVal == 0) + break; + + char ipv6str[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &client_addr, ipv6str, sizeof(ipv6str)); + buf[retVal] = '\0'; + printf("[TCP/%s:%d] %s\n", ipv6str, ntohs(client_addr.sin6_port), buf); + + if (SOCKET_ERROR == send(client_sock, buf, BUFSIZE, 0)) + err_display("send()"); + } + + WSACleanup(); +} diff --git a/impl/socket/address.cpp b/impl/socket/address.cpp index 81c4a09..5b30d35 100644 --- a/impl/socket/address.cpp +++ b/impl/socket/address.cpp @@ -1,13 +1,53 @@ #include "socket/address.h" -#include -#include -#include - #include namespace Network { +//void Address::set(int type, gsl::czstring presentationAddr, +// std::uint16_t port) { +// zeroFill(); +// setType(type); +// +// if (type == AF_INET) { +// ::inet_pton(AF_INET, presentationAddr, &addr_in.sin_addr); +// addr_in.sin_port = htons(port); +// } else if (type == AF_INET6) { +// ::inet_pton(AF_INET6, presentationAddr, &addr_in6.sin6_addr); +// addr_in6.sin6_port = htons(port); +// } +// +// BIO_ADDRINFO* res; +// if (!BIO_lookup_ex(presentationAddr, std::to_string(port).c_str(), +// BIO_LOOKUP_CLIENT, type, SOCK_DGRAM, 0, &res)) { +// ::BIO_ADDRINFO_free(res); +// throw std::runtime_error("can't resolve address"); +// } +// +// int sock = -1; +// for (const BIO_ADDRINFO* ai = res; ai != nullptr; +// ai = ::BIO_ADDRINFO_next(ai)) { +// sock = ::BIO_socket(BIO_ADDRINFO_family(ai), type, 0, 0); +// if (sock == -1) +// continue; +// else { +// auto bio_addr = ::BIO_ADDRINFO_address(ai); +// BIO_ADDR_rawaddress(bio_addr, &addr, (unsigned long long*)&length); +// bio_addr_info = ::BIO_ADDR_dup(bio_addr); +// +// break; +// } +// } +// +// if (sock != -1) +// ::close(sock); +// +// ::BIO_ADDRINFO_free(res); +// +// addr_in.sin_family = type; +// addr_in.sin_port = htons(port); +//} + Address::Address() { zeroFill(); } Address::Address(int type, gsl::czstring presentationAddr, std::uint16_t port) { @@ -19,56 +59,22 @@ void Address::zeroFill() { memset(&addr_in6, 0, sizeof(addr_in6)); } void Address::set(int type, gsl::czstring presentationAddr, std::uint16_t port) { zeroFill(); - setType(type); - BIO_ADDRINFO* res; - if (!BIO_lookup_ex(presentationAddr, std::to_string(port).c_str(), - BIO_LOOKUP_CLIENT, type, SOCK_DGRAM, 0, &res)) { - throw std::runtime_error("can't resolve address"); + if (type == AF_INET) { + addr_in.sin_family = AF_INET; + ::inet_pton(AF_INET, presentationAddr, &addr_in.sin_addr); + addr_in.sin_port = htons(port); + length = sizeof(sockaddr_in); + } else if (type == AF_INET6) { + addr_in6.sin6_family = AF_INET6; + ::inet_pton(AF_INET6, presentationAddr, &addr_in6.sin6_addr); + addr_in6.sin6_port = htons(port); + length = sizeof(sockaddr_in6); } - - int sock = -1; - for (const BIO_ADDRINFO* ai = res; ai != nullptr; - ai = ::BIO_ADDRINFO_next(ai)) { - sock = BIO_socket(BIO_ADDRINFO_family(ai), SOCK_DGRAM, 0, 0); - if (sock == -1) continue; - } - ::close(sock); - - addr_in.sin_family = type; - ::inet_pton(type, presentationAddr, &addr_in.sin_addr); - addr_in.sin_port = htons(port); -} - -void Address::set(int type, in_addr_t addr, std::uint16_t port) { - zeroFill(); - setType(type); - - addr_in.sin_family = type; - addr_in.sin_addr.s_addr = htonl(addr); - addr_in.sin_port = htons(port); -} - -void Address::set(int type, in_addr addr, std::uint16_t port) { - zeroFill(); - setType(type); - - addr_in.sin_family = type; - addr_in.sin_addr = addr; - addr_in.sin_port = htons(port); -} - -void Address::set(int type, in6_addr addr, std::uint16_t port) { - zeroFill(); - setType(type); - - addr_in6.sin6_family = type; - addr_in6.sin6_addr = addr; - addr_in6.sin6_port = htons(port); } void Address::setType(int type) { - family = type; + zeroFill(); if (type == AF_INET) length = sizeof(sockaddr_in); @@ -77,20 +83,29 @@ void Address::setType(int type) { } Address::operator std::string() { - char addrStr[INET6_ADDRSTRLEN]; + std::optional port = getPort(); - if (family != AF_INET && family != AF_INET6 || !getPort()) - return std::string(); + if (!port) return std::string(); - ::inet_ntop(family, &addr, addrStr, sizeof(addrStr)); + if (length == sizeof(addr_in)) { + char addrStr[INET_ADDRSTRLEN]; + ::inet_ntop(AF_INET, &addr_in.sin_addr, addrStr, sizeof(addrStr)); - return std::format("{}:{}", addrStr, getPort()); + return std::format("{}:{}", addrStr, port.value()); + } else if (length == sizeof(addr_in6)) { + char addrStr[INET6_ADDRSTRLEN]; + ::inet_ntop(AF_INET6, &addr_in6.sin6_addr, addrStr, sizeof(addrStr)); + + return std::format("{}:{}", addrStr, port.value()); + } + + return std::string(); } std::uint16_t Address::getPort() const { - if (family == AF_INET) + if (length == sizeof(addr_in)) return ntohs(addr_in.sin_port); - else if (family == AF_INET6) + else if (length == sizeof(addr_in6)) return ntohs(addr_in6.sin6_port); else return 0; diff --git a/impl/socket/iocp.cpp b/impl/socket/iocp.cpp index 52e912d..833835d 100644 --- a/impl/socket/iocp.cpp +++ b/impl/socket/iocp.cpp @@ -4,6 +4,11 @@ namespace Network { +IOCP::IOCP() { + gen_ = std::mt19937(rd_()); + jitterDist_ = std::uniform_int_distribution(-10, 10); +} + IOCP::~IOCP() { destruct(); } void IOCP::destruct() { @@ -22,15 +27,16 @@ void IOCP::registerSocket(IOCPPASSINDATA* data) { DWORD recvbytes = 0, flags = 0; ::WSARecv(data->socket->sock, &data->wsabuf, 1, &recvbytes, &flags, &data->overlapped, NULL); + #endif } -int IOCP::recv(IOCPPASSINDATA& data) { - SOCKET sock = data.socket->sock; +int IOCP::recv(IOCPPASSINDATA* data) { + SOCKET sock = data->socket->sock; std::lock_guard lock(*GetRecvQueueMutex_(sock)); auto queue = GetRecvQueue_(sock); - std::uint32_t left_data = data.wsabuf.len; + std::uint32_t left_data = data->wsabuf.len; std::uint32_t copied = 0; while (!queue->empty() && left_data != 0) { @@ -41,7 +47,7 @@ int IOCP::recv(IOCPPASSINDATA& data) { std::uint32_t available = front.first.size() - offset; std::uint32_t to_copy = (left_data < available) ? left_data : available; - ::memcpy(data.wsabuf.buf + copied, front.first.data() + offset, to_copy); + ::memcpy(data->wsabuf.buf + copied, front.first.data() + offset, to_copy); copied += to_copy; left_data -= to_copy; offset += to_copy; @@ -56,11 +62,13 @@ int IOCP::recv(IOCPPASSINDATA& data) { return copied; } -int IOCP::send(SOCKET sock, std::vector& data) { - std::lock_guard lock(*send_queue_mutex_[sock]); - for (auto it : data) { - it.event = IOCPEVENT::WRITE; - send_queue_[sock]->push_back(it); +int IOCP::send(SOCKET sock, std::vector* data) { + auto lk = GetSendQueueMutex_(sock); + auto queue = GetSendQueue_(sock); + std::lock_guard lock(*lk); + for (auto& it : *data) { + it->event = IOCPEVENT::WRITE; + queue->push_back(it); } IOCPThread_->enqueueJob( @@ -71,11 +79,11 @@ int IOCP::send(SOCKET sock, std::vector& data) { return 0; } -std::shared_ptr> IOCP::GetSendQueue_(SOCKET sock) { +std::shared_ptr> IOCP::GetSendQueue_(SOCKET sock) { std::lock_guard lock(socket_mod_mutex_); if (send_queue_.find(sock) == send_queue_.end()) { - send_queue_[sock] = std::make_shared>( - std::list()); + send_queue_[sock] = std::make_shared>( + std::list()); } return send_queue_[sock]; } @@ -118,14 +126,14 @@ void IOCP::packet_sender_(SOCKET sock) { while (!queue->empty()) { auto front = queue->front(); queue->pop_front(); - front.event = IOCPEVENT::WRITE; + front->event = IOCPEVENT::WRITE; int data_len = 0; if (proto_ == SessionProtocol::TLS || proto_ == SessionProtocol::QUIC) { - int ret = ::SSL_write(front.ssl, front.wsabuf.buf, front.wsabuf.len); + int ret = ::SSL_write(front->ssl, front->wsabuf.buf, front->wsabuf.len); if (ret <= 0) { - int err = ::SSL_get_error(front.ssl, ret); + int err = ::SSL_get_error(front->ssl, ret); if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) { queue->push_front(front); break; @@ -135,7 +143,7 @@ void IOCP::packet_sender_(SOCKET sock) { break; } - while ((data_len = ::BIO_read(wbio_, buf.data(), buf.size())) > 0) { + while ((data_len = ::BIO_read(front->wbio, buf.data(), buf.size())) > 0) { wsabuf.buf = buf.data(); wsabuf.len = data_len; @@ -143,8 +151,8 @@ void IOCP::packet_sender_(SOCKET sock) { } } else { - data_len = front.wsabuf.len; - wsabuf.buf = front.wsabuf.buf; + data_len = front->wsabuf.len; + wsabuf.buf = front->wsabuf.buf; wsabuf.len = data_len; ::WSASend(sock, &wsabuf, 1, &sendbytes, 0, nullptr, nullptr); } diff --git a/impl/socket/tcp_socket.cpp b/impl/socket/tcp_socket.cpp index 8e295c6..2bb5477 100644 --- a/impl/socket/tcp_socket.cpp +++ b/impl/socket/tcp_socket.cpp @@ -17,10 +17,22 @@ void TCPSocket::accept(TCPSocket &newSock, Address &__addr) { } int TCPSocket::connect(Address &serveraddr) { - int retVal = - ::connect(sock, (struct sockaddr *)&serveraddr.addr, serveraddr.length); + std::string addr_string = serveraddr; + int retVal = -1; + if (serveraddr.family == AF_INET) + retVal = ::connect(sock, (const sockaddr *)&serveraddr.addr_in, + serveraddr.length); + else + retVal = ::connect(sock, (const sockaddr *)&serveraddr.addr_in6, + serveraddr.length); memcpy(&remoteAddr, &serveraddr, sizeof(Address)); - if (retVal == INVALID_SOCKET) spdlog::error("connect()"); + if (retVal == INVALID_SOCKET) { +#ifdef _WIN32 + int err = WSAGetLastError(); + spdlog::error("connect() failed: WSA error {} (0x{:X})", err, err); +#endif + spdlog::error("connect()"); + } return retVal; } @@ -39,4 +51,4 @@ int TCPSocket::send(const void *__buf, size_t __n, int __flags) { return retVal; } -} // namespace Socket +} // namespace Network diff --git a/impl/utils/log.cpp b/impl/utils/log.cpp index 7fff2ae..5ff0fa8 100644 --- a/impl/utils/log.cpp +++ b/impl/utils/log.cpp @@ -19,7 +19,7 @@ void setDefaultLogger(spdlog::level::level_enum logLevel, sinks.push_back(std::make_shared()); #endif auto chatteringLogger = std::make_shared( - "Chattering Logger", begin(sinks), end(sinks)); + "Logger", begin(sinks), end(sinks)); chatteringLogger->set_level(logLevel); spdlog::set_default_logger(chatteringLogger); } diff --git a/impl/utils/thread_pool.cpp b/impl/utils/thread_pool.cpp index 9a98004..7fe58f9 100644 --- a/impl/utils/thread_pool.cpp +++ b/impl/utils/thread_pool.cpp @@ -1,15 +1,14 @@ #include "utils/thread_pool.h" + #include "precomp.h" namespace utils { -ThreadPool::ThreadPool() : ThreadPool(0) {} - -ThreadPool::ThreadPool(std::uint32_t numThreads) { init(numThreads); } +ThreadPool::ThreadPool(std::uint32_t numThreads) { init_(numThreads); } ThreadPool::~ThreadPool() { terminate(); } -void ThreadPool::init(std::uint32_t numThreads) { +void ThreadPool::init_(std::uint32_t numThreads) { int numCPU = numThreads; if (numThreads == 0) { #ifdef _WIN32 @@ -31,9 +30,9 @@ void ThreadPool::init(std::uint32_t numThreads) { } } threadCount = numCPU; - workers_.reserve(numCPU); + workers_.resize(numCPU); - while (numCPU--) workers_.emplace_back([this]() { this->Worker(); }); + while (numCPU--) workers_[numCPU] = std::thread([this]() { this->Worker(); }); } void ThreadPool::terminate() { @@ -47,7 +46,7 @@ void ThreadPool::terminate() { void ThreadPool::respawnWorker(std::uint32_t numThreads) { terminate(); terminate_ = false; - init(numThreads); + init_(numThreads); } void* ThreadPool::Worker() { @@ -79,4 +78,4 @@ void* ThreadPool::Worker() { return nullptr; } -} // namespace Chattr +} // namespace utils diff --git a/include/socket/address.h b/include/socket/address.h index 9013b8a..593e286 100644 --- a/include/socket/address.h +++ b/include/socket/address.h @@ -1,5 +1,9 @@ #pragma once +#include +#include +#include + namespace Network { struct Address { @@ -8,9 +12,6 @@ struct Address { void zeroFill(); void set(int type, gsl::czstring presentationAddr, std::uint16_t port); - void set(int type, in_addr_t addr, std::uint16_t port); - void set(int type, in_addr addr, std::uint16_t port); - void set(int type, in6_addr addr, std::uint16_t port); void setType(int type); operator std::string(); @@ -23,6 +24,8 @@ struct Address { struct sockaddr_in6 addr_in6; }; socklen_t length; + + BIO_ADDR* bio_addr_info; }; } // namespace Chattr diff --git a/include/socket/iocp.h b/include/socket/iocp.h index c6d83ff..aeeccec 100644 --- a/include/socket/iocp.h +++ b/include/socket/iocp.h @@ -6,6 +6,7 @@ #include #include #include +#include #include "socket.h" #include "utils/thread_pool.h" @@ -30,7 +31,7 @@ class IOCP; enum class IOCPEVENT { QUIT, READ, WRITE }; -struct IOCPPASSINDATA { +struct IOCPPASSINDATA { // 얘 double free 문제 있음.. OVERLAPPED overlapped; IOCPEVENT event; std::shared_ptr socket; @@ -49,6 +50,8 @@ struct IOCPPASSINDATA { std::memset(&overlapped, 0, sizeof(overlapped)); event = IOCPEVENT::QUIT; socket = nullptr; + rbio = nullptr; + wbio = nullptr; transferredbytes = 0; this->bufsize = bufsize; IOCPInstance = nullptr; @@ -56,25 +59,44 @@ struct IOCPPASSINDATA { wsabuf.buf = new char[bufsize]; wsabuf.len = bufsize; } - IOCPPASSINDATA(const IOCPPASSINDATA& other) - : event(other.event), - socket(other.socket), - transferredbytes(other.transferredbytes), - bufsize(other.bufsize), - IOCPInstance(other.IOCPInstance) -#ifdef __linux__ - , - sendQueue(other.sendQueue) -#endif - { + IOCPPASSINDATA(std::uint32_t bufsize, SSL_CTX* ctx) { std::memset(&overlapped, 0, sizeof(overlapped)); - wsabuf.buf = new char[other.bufsize]; - wsabuf.len = other.bufsize; - std::memcpy(wsabuf.buf, other.wsabuf.buf, other.wsabuf.len); + event = IOCPEVENT::QUIT; + socket = nullptr; + ssl = ::SSL_new(ctx); + rbio = ::BIO_new(::BIO_s_mem()); + wbio = ::BIO_new(::BIO_s_mem()); + ::SSL_set_bio(ssl, rbio, wbio); + transferredbytes = 0; + this->bufsize = bufsize; + IOCPInstance = nullptr; + + wsabuf.buf = new char[bufsize]; + wsabuf.len = bufsize; + } + + IOCPPASSINDATA(const IOCPPASSINDATA& other) { + if (this != &other) { + std::memset(&overlapped, 0, sizeof(overlapped)); + event = other.event; + socket = other.socket; + rbio = other.rbio; + wbio = other.wbio; + transferredbytes = other.transferredbytes; + bufsize = other.bufsize; + IOCPInstance = other.IOCPInstance; +#ifdef __linux__ + sendQueue = other.sendQueue; +#endif + wsabuf.buf = new char[other.bufsize]; + wsabuf.len = other.bufsize; + std::memcpy(wsabuf.buf, other.wsabuf.buf, other.wsabuf.len); + } } ~IOCPPASSINDATA() { if (wsabuf.buf != nullptr) delete[] wsabuf.buf; + wsabuf.buf = nullptr; } IOCPPASSINDATA& operator=(const IOCPPASSINDATA& other) { @@ -82,6 +104,8 @@ struct IOCPPASSINDATA { std::memset(&overlapped, 0, sizeof(overlapped)); event = other.event; socket = other.socket; + rbio = other.rbio; + wbio = other.wbio; transferredbytes = other.transferredbytes; bufsize = other.bufsize; IOCPInstance = other.IOCPInstance; @@ -99,16 +123,13 @@ struct IOCPPASSINDATA { class IOCP { public: + IOCP(); ~IOCP(); void init(utils::ThreadPool* __IOCPThread, SessionProtocol proto) { IOCPThread_ = __IOCPThread; proto_ = proto; - if (proto == SessionProtocol::TLS || proto == SessionProtocol::QUIC) { - rbio_ = ::BIO_new(::BIO_s_mem()); - wbio_ = ::BIO_new(::BIO_s_mem()); - } #ifdef _WIN32 completionPort_ = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); @@ -134,8 +155,8 @@ class IOCP { void registerSocket(IOCPPASSINDATA* data); // data는 한 가지 소켓에 보내는 패킷만 담아야 합니다 - int send(SOCKET sock, std::vector& data); - int recv(IOCPPASSINDATA& data); + int send(SOCKET sock, std::vector* data); + int recv(IOCPPASSINDATA* data); private: #ifdef _WIN32 @@ -143,11 +164,19 @@ class IOCP { IOCPPASSINDATA* data; SOCKET sock; DWORD cbTransfrred; + int jitter = jitterDist_(gen_); int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, (PULONG_PTR)&sock, - (LPOVERLAPPED*)&data, INFINITE); + (LPOVERLAPPED*)&data, 1000 + jitter); if (retVal == 0 || cbTransfrred == 0) { + DWORD lasterror = GetLastError(); + if (lasterror == WAIT_TIMEOUT) { + IOCPThread->enqueueJob([this](utils::ThreadPool* th, + std::uint8_t __) { iocpWatcher_(th); }, + 0); + return; + } data->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(data->socket->remoteAddr)); @@ -161,7 +190,7 @@ class IOCP { auto queue_list = GetRecvQueue_(data->socket->sock); if (data->event == IOCPEVENT::READ) { if (proto_ == SessionProtocol::TLS || proto_ == SessionProtocol::QUIC) { - ::BIO_write(rbio_, data->wsabuf.buf, cbTransfrred); + ::BIO_write(data->rbio, data->wsabuf.buf, cbTransfrred); while ((red_data = ::SSL_read(data->ssl, buf.data(), buf.size())) > 0) { queue_list->emplace_back(std::make_pair( @@ -188,22 +217,21 @@ class IOCP { #endif - std::shared_ptr> GetSendQueue_(SOCKET sock); + std::shared_ptr> GetSendQueue_(SOCKET sock); std::shared_ptr, std::uint32_t>>> GetRecvQueue_(SOCKET sock); std::shared_ptr GetSendQueueMutex_(SOCKET sock); std::shared_ptr GetRecvQueueMutex_(SOCKET sock); void packet_sender_(SOCKET sock); - - struct WSAManager* wsaManager = WSAManager::GetInstance(); utils::ThreadPool* IOCPThread_; - BIO* rbio_ = nullptr; - BIO* wbio_ = nullptr; - SessionProtocol proto_; + std::random_device rd_; + std::mt19937 gen_; + std::uniform_int_distribution jitterDist_; + // 밑의 unordered_map들에 키를 추가/제거 하려는 스레드는 이 뮤텍스를 잡아야 // 함. std::mutex socket_mod_mutex_; @@ -233,7 +261,7 @@ class IOCP { // 스레드가 데이터를 추가하고 미전송된 경우를 대비하여 스레드가 대기하도록 // 한다. 리눅스에서도 send_queue에 데이터를 쌓고 스레드가 대기하도록 한다. std::unordered_map> send_queue_mutex_; - std::unordered_map>> + std::unordered_map>> send_queue_; // 쓰기 싫었지만 쓰기 큐를 직렬화 하는 것 밖에 좋은 수가 생각이 안 남.. /*std::mutex send_queue_mutex_; diff --git a/include/socket/wsa_manager.h b/include/socket/wsa_manager.h index ebc3a9e..cf5b3f7 100644 --- a/include/socket/wsa_manager.h +++ b/include/socket/wsa_manager.h @@ -4,15 +4,6 @@ namespace Network { struct WSAManager { public: - static WSAManager* GetInstance() { - static WSAManager instance; - return &instance; - } - - WSAManager(const WSAManager&) = delete; - WSAManager& operator=(const WSAManager&) = delete; - - private: WSAManager(); ~WSAManager(); }; diff --git a/include/utils/thread_pool.h b/include/utils/thread_pool.h index 37f2d7e..0766490 100644 --- a/include/utils/thread_pool.h +++ b/include/utils/thread_pool.h @@ -12,11 +12,9 @@ namespace utils { class ThreadPool { public: - ThreadPool(); ThreadPool(std::uint32_t numThreads); ~ThreadPool(); - void init(std::uint32_t numThreads); void terminate(); void respawnWorker(std::uint32_t numThreads); @@ -66,6 +64,8 @@ class ThreadPool { int threadCount = 0; private: + void init_(std::uint32_t numThreads); + void* Worker(); std::condition_variable jobQueueCV_;