#pragma once #include #include #include #include #include #include #include #include "socket.h" #include "utils/thread_pool.h" #include "wsa_manager.h" #ifdef __linux__ typedef struct _OVERLAPPED { char dummy; } OVERLAPPED; typedef struct __WSABUF { std::uint32_t len; char* buf; } WSABUF; #endif namespace Network { class IOCP; enum class IOCPEVENT { QUIT, READ, WRITE }; struct IOCPPASSINDATA { // 얘 double free 문제 있음.. OVERLAPPED overlapped; IOCPEVENT event; std::shared_ptr socket; SSL* ssl; BIO* rbio; // bio는 ssl별로 달라야 하므로 분리해야 함.. BIO* wbio; std::uint32_t transferredbytes; WSABUF wsabuf; std::uint32_t bufsize; IOCP* IOCPInstance; #ifdef __linux__ std::shared_ptr> sendQueue; #endif IOCPPASSINDATA(std::uint32_t bufsize) { std::memset(&overlapped, 0, sizeof(overlapped)); event = IOCPEVENT::QUIT; socket = nullptr; rbio = nullptr; wbio = nullptr; transferredbytes = 0; this->bufsize = bufsize; IOCPInstance = nullptr; wsabuf.buf = new char[bufsize]; wsabuf.len = bufsize; } IOCPPASSINDATA(std::uint32_t bufsize, SSL_CTX* ctx) { std::memset(&overlapped, 0, sizeof(overlapped)); event = IOCPEVENT::QUIT; socket = nullptr; ssl = ::SSL_new(ctx); rbio = ::BIO_new(::BIO_s_mem()); wbio = ::BIO_new(::BIO_s_mem()); ::SSL_set_bio(ssl, rbio, wbio); transferredbytes = 0; this->bufsize = bufsize; IOCPInstance = nullptr; wsabuf.buf = new char[bufsize]; wsabuf.len = bufsize; } IOCPPASSINDATA(const IOCPPASSINDATA& other) { if (this != &other) { std::memset(&overlapped, 0, sizeof(overlapped)); event = other.event; socket = other.socket; rbio = other.rbio; wbio = other.wbio; transferredbytes = other.transferredbytes; bufsize = other.bufsize; IOCPInstance = other.IOCPInstance; #ifdef __linux__ sendQueue = other.sendQueue; #endif wsabuf.buf = new char[other.bufsize]; wsabuf.len = other.bufsize; std::memcpy(wsabuf.buf, other.wsabuf.buf, other.wsabuf.len); } } ~IOCPPASSINDATA() { if (wsabuf.buf != nullptr) delete[] wsabuf.buf; wsabuf.buf = nullptr; } IOCPPASSINDATA& operator=(const IOCPPASSINDATA& other) { if (this != &other) { std::memset(&overlapped, 0, sizeof(overlapped)); event = other.event; socket = other.socket; rbio = other.rbio; wbio = other.wbio; transferredbytes = other.transferredbytes; bufsize = other.bufsize; IOCPInstance = other.IOCPInstance; #ifdef __linux__ sendQueue = other.sendQueue; #endif if (wsabuf.buf != nullptr) delete[] wsabuf.buf; wsabuf.buf = new char[other.bufsize]; wsabuf.len = other.bufsize; std::memcpy(wsabuf.buf, other.wsabuf.buf, other.wsabuf.len); } return *this; } }; class IOCP { public: IOCP(); ~IOCP(); void init(utils::ThreadPool* __IOCPThread, SessionProtocol proto) { IOCPThread_ = __IOCPThread; proto_ = proto; #ifdef _WIN32 completionPort_ = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); if (completionPort_ == NULL) { spdlog::critical("CreateIoCompletionPort()"); std::exit(EXIT_FAILURE); } int tCount = __IOCPThread->threadCount; spdlog::info("Resizing threadpool size to: {}", tCount * 2); __IOCPThread->respawnWorker(tCount * 2); for (int i = 0; i < tCount; i++) IOCPThread_->enqueueJob( [this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, 0); #endif } void destruct(); void registerSocket(IOCPPASSINDATA* data); // data는 한 가지 소켓에 보내는 패킷만 담아야 합니다 int send(SOCKET sock, std::vector* data); int recv(IOCPPASSINDATA* data); private: #ifdef _WIN32 void iocpWatcher_(utils::ThreadPool* IOCPThread) { IOCPPASSINDATA* data; SOCKET sock; DWORD cbTransfrred; int jitter = jitterDist_(gen_); int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, (PULONG_PTR)&sock, (LPOVERLAPPED*)&data, 1000 + jitter); if (retVal == 0 || cbTransfrred == 0) { DWORD lasterror = GetLastError(); if (lasterror == WAIT_TIMEOUT) { IOCPThread->enqueueJob([this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, 0); return; } data->event = IOCPEVENT::QUIT; spdlog::debug("Disconnected. [{}]", (std::string)(data->socket->remoteAddr)); delete data; } else { data->transferredbytes = cbTransfrred; } std::vector buf(16384); // SSL_read최대 반환 크기 int red_data = 0; auto queue_list = GetRecvQueue_(data->socket->sock); if (data->event == IOCPEVENT::READ) { if (proto_ == SessionProtocol::TLS || proto_ == SessionProtocol::QUIC) { ::BIO_write(data->rbio, data->wsabuf.buf, cbTransfrred); while ((red_data = ::SSL_read(data->ssl, buf.data(), buf.size())) > 0) { queue_list->emplace_back(std::make_pair( std::vector(buf.begin(), buf.begin() + red_data), 0)); } } else { ::memcpy(buf.data(), data->wsabuf.buf, data->transferredbytes); queue_list->emplace_back(std::make_pair( std::vector(buf.begin(), buf.begin() + data->transferredbytes), 0)); } DWORD recvbytes = 0, flags = 0; ::WSARecv(data->socket->sock, &data->wsabuf, 1, &recvbytes, &flags, &data->overlapped, NULL); } else { // WRITE 시, 무시한다. delete data; } IOCPThread->enqueueJob( [this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, 0); } #elif __linux__ #endif std::shared_ptr> GetSendQueue_(SOCKET sock); std::shared_ptr, std::uint32_t>>> GetRecvQueue_(SOCKET sock); std::shared_ptr GetSendQueueMutex_(SOCKET sock); std::shared_ptr GetRecvQueueMutex_(SOCKET sock); void packet_sender_(SOCKET sock); utils::ThreadPool* IOCPThread_; SessionProtocol proto_; std::random_device rd_; std::mt19937 gen_; std::uniform_int_distribution jitterDist_; // 밑의 unordered_map들에 키를 추가/제거 하려는 스레드는 이 뮤텍스를 잡아야 // 함. std::mutex socket_mod_mutex_; // 각 소켓별 뮤텍스. 다른 스레드가 읽는 중이라면 수신 순서 보장을 위해 다른 // 스레드는 수신을 포기하고 이전 스레드에 전송을 위임해야 함. (ONESHOT으로 // 인해 발생하지 않을 것으로 기대되는 행동이기는 하나 보장되지 않을 수 // 있으므로) 수신 스레드는 epoll이나 iocp를 통해 적당히 수신받아 이 큐를 // 채워야 함. (항시 socket에 대한 큐에 대해 읽기 시도가 행해질 수 있어야 // 한다는 뜻임) EPOLLIN에 대해서는 ONESHOT으로 등록해 놓고 읽는 도중에 버퍼에 // 새 값이 채워질 수 있으므로 읽기가 끝나고 나서 재등록한다 std::unordered_map> recv_queue_mutex_; // 각 소켓별 패킷, int는 그 vector의 시작 인덱스(vector의 끝까지 다 읽었으면 // 그 vector는 list에서 삭제되어야 하며, 데이터는 평문으로 변환하여 저장한다) std::unordered_map< SOCKET, std::shared_ptr, std::uint32_t>>>> recv_queue_; // 각 소켓별 뮤텍스. 다른 스레드가 쓰는 중이라면 송신 순서 보장을 위해 다른 // 스레드는 대기한다. condition variable을 쓰지 않는 이유는 소켓 갯수만큼의 // 스레드가 대기할 수 없기 때문이며, 송신 등록 시에 적당히 송신 스레드를 // 스폰해야 함. 이 변수는 항상 송신 스레드에 의해 관리되어야만 하며, // 윈도우에서는 송신을 iocp가 대행하기 때문에 이 큐가 필요 없는 것처럼 느껴질 // 수 있으나, 송신 순서를 보장하기 위해 WSASend를 한 스레드가 연속해서 // 호출해야만 하는데 이는 한번 쓰기 호출 시에 송신 중 데이터를 추가하려는 // 스레드가 데이터를 추가하고 미전송된 경우를 대비하여 스레드가 대기하도록 // 한다. 리눅스에서도 send_queue에 데이터를 쌓고 스레드가 대기하도록 한다. std::unordered_map> send_queue_mutex_; std::unordered_map>> send_queue_; // 쓰기 싫었지만 쓰기 큐를 직렬화 하는 것 밖에 좋은 수가 생각이 안 남.. /*std::mutex send_queue_mutex_; std::condition_variable cv_send_queue_; std::list send_queue_;*/ #ifdef _WIN32 HANDLE completionPort_ = INVALID_HANDLE_VALUE; #elif __linux__ #endif }; } // namespace Network