diff --git a/Client/src/asteroid/main.cpp b/Client/src/asteroid/main.cpp index 3afaaa4..75f8d7f 100644 --- a/Client/src/asteroid/main.cpp +++ b/Client/src/asteroid/main.cpp @@ -18,7 +18,7 @@ #endif utils::ThreadPool tp(0); Network::IOCP iocp; - iocp.init(&tp, SessionProtocol::TLS); + iocp.init(&tp, SessionProtocol::TCP); Network::Address addr; in6_addr in6addr; @@ -41,9 +41,15 @@ data->event = Network::IOCPEVENT::WRITE; data->wsabuf.buf[0] = 'a'; data->wsabuf.buf[1] = 'b'; - data->wsabuf.buf[2] = '\0'; + data->wsabuf.buf[2] = ' '; data->wsabuf.len = 3; send_data.push_back(data); + Network::IOCPPASSINDATA* data2 = new Network::IOCPPASSINDATA(16 * 1024); + data2->wsabuf.buf[0] = 'b'; + data2->wsabuf.buf[1] = 'a'; + data2->wsabuf.buf[2] = '\0'; + data2->wsabuf.len = 3; + send_data.push_back(data2); iocp.send(sock.sock, &send_data); Network::IOCPPASSINDATA* recv_data = new Network::IOCPPASSINDATA(16 * 1024); @@ -53,6 +59,29 @@ spdlog::info("recv_data: {}", recv_data->wsabuf.buf); + std::vector send_data2; + data->event = Network::IOCPEVENT::WRITE; + data->wsabuf.buf[0] = 'c'; + data->wsabuf.buf[1] = 'd'; + data->wsabuf.buf[2] = ' '; + data->wsabuf.len = 3; + send_data2.push_back(data); + data2 = new Network::IOCPPASSINDATA(16 * 1024); + data2->wsabuf.buf[0] = 'd'; + data2->wsabuf.buf[1] = 'c'; + data2->wsabuf.buf[2] = '\0'; + data2->wsabuf.len = 3; + send_data2.push_back(data2); + iocp.send(sock.sock, &send_data2); + + recv_data = new Network::IOCPPASSINDATA(16 * 1024); + recv_data->socket = std::make_shared(sock); + recv_data->IOCPInstance = &iocp; + while (!iocp.recv(recv_data)); // 어떤 데이터를 읽는걸 보장받고 싶다면 그냥 + // 스린락 걸어버리기. + + spdlog::info("recv_data: {}", recv_data->wsabuf.buf); + const veng::GlfwInitialization _glfw; veng::Window window("Vulkan Engine", {800, 600}); diff --git a/impl/socket/iocp.cpp b/impl/socket/iocp.cpp index b627e3a..6a63e70 100644 --- a/impl/socket/iocp.cpp +++ b/impl/socket/iocp.cpp @@ -11,6 +11,29 @@ IOCP::IOCP() { IOCP::~IOCP() { destruct(); } +void IOCP::init(utils::ThreadPool* __IOCPThread, SessionProtocol proto) { + IOCPThread_ = __IOCPThread; + proto_ = proto; + +#ifdef _WIN32 + completionPort_ = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); + if (completionPort_ == NULL) { + spdlog::critical("CreateIoCompletionPort()"); + std::exit(EXIT_FAILURE); + } + int tCount = __IOCPThread->threadCount; + + spdlog::info("Resizing threadpool size to: {}", tCount * 2); + + __IOCPThread->respawnWorker(tCount * 2); + + for (int i = 0; i < tCount; i++) + IOCPThread_->enqueueJob( + [this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, + 0); +#endif +} + void IOCP::destruct() { #ifdef __linux__ @@ -40,7 +63,7 @@ void IOCP::registerSocket(IOCPPASSINDATA* data) { #endif } -int IOCP::recv(IOCPPASSINDATA* data) { +int IOCP::recv(IOCPPASSINDATA* data) { //읽은 바이트수가 무조건 100임? 왜..? SOCKET sock = data->socket->sock; std::lock_guard lock(*GetRecvQueueMutex_(sock)); auto queue = GetRecvQueue_(sock); @@ -88,6 +111,73 @@ int IOCP::send(SOCKET sock, std::vector* data) { return 0; } +int IOCP::GetRecvedPacketCount(SOCKET sock) { + std::lock_guard lock(socket_mod_mutex_); + auto queue = GetRecvQueue_(sock); + return queue->size(); +} + +void IOCP::iocpWatcher_(utils::ThreadPool* IOCPThread) { + IOCPPASSINDATA* data; + SOCKET sock; + DWORD cbTransfrred; + int jitter = jitterDist_(gen_); + int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, + (PULONG_PTR)&sock, + (LPOVERLAPPED*)&data, 1000 + jitter); + + if (retVal == 0 || cbTransfrred == 0) { + DWORD lasterror = GetLastError(); + if (lasterror == WAIT_TIMEOUT) { + IOCPThread->enqueueJob( + [this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, + 0); + return; + } + data->event = IOCPEVENT::QUIT; + spdlog::debug("Disconnected. [{}]", + (std::string)(data->socket->remoteAddr)); + delete data; + } else { + data->transferredbytes = cbTransfrred; + } + + std::vector buf(16384); // SSL_read최대 반환 크기 + int red_data = 0; + std::lock_guard lock(*GetRecvQueueMutex_(sock)); + auto queue_list = GetRecvQueue_(data->socket->sock); + if (data->event == IOCPEVENT::READ) { + if (proto_ == SessionProtocol::TLS || proto_ == SessionProtocol::QUIC) { + ::BIO_write(::SSL_get_rbio(data->ssl.get()), data->wsabuf.buf, + cbTransfrred); + + while ((red_data = ::SSL_read(data->ssl.get(), buf.data(), buf.size())) > + 0) { + queue_list->emplace_back(std::make_pair( + std::vector(buf.begin(), buf.begin() + red_data), 0)); + } + } else { + ::memcpy(buf.data(), data->wsabuf.buf, data->transferredbytes); + queue_list->emplace_back(std::make_pair( + std::vector(buf.begin(), buf.begin() + data->transferredbytes), + 0)); + } + DWORD recvbytes = 0, flags = 0; + + IOCPPASSINDATA* recv_data = new IOCPPASSINDATA(data->bufsize); + recv_data->event = IOCPEVENT::READ; + recv_data->socket = data->socket; + + delete data; + ::WSARecv(recv_data->socket->sock, &recv_data->wsabuf, 1, &recvbytes, + &flags, &recv_data->overlapped, NULL); + } else { // WRITE 시, 무시한다. + delete data; + } + IOCPThread->enqueueJob( + [this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, 0); +} + std::shared_ptr> IOCP::GetSendQueue_(SOCKET sock) { std::lock_guard lock(socket_mod_mutex_); if (send_queue_.find(sock) == send_queue_.end()) { diff --git a/include/socket/iocp.h b/include/socket/iocp.h index 8ca34fa..4c7ab88 100644 --- a/include/socket/iocp.h +++ b/include/socket/iocp.h @@ -60,7 +60,7 @@ struct IOCPPASSINDATA { std::memset(&overlapped, 0, sizeof(overlapped)); event = IOCPEVENT::QUIT; socket = nullptr; - ssl = std::make_shared(::SSL_new(ctx), ::SSL_free); + ssl = std::shared_ptr(::SSL_new(ctx), ::SSL_free); ::SSL_set_bio(ssl.get(), ::BIO_new(::BIO_s_mem()), ::BIO_new(::BIO_s_mem())); transferredbytes = 0; @@ -118,100 +118,21 @@ class IOCP { IOCP(); ~IOCP(); - void init(utils::ThreadPool* __IOCPThread, SessionProtocol proto) { - IOCPThread_ = __IOCPThread; - proto_ = proto; - -#ifdef _WIN32 - completionPort_ = - ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - if (completionPort_ == NULL) { - spdlog::critical("CreateIoCompletionPort()"); - std::exit(EXIT_FAILURE); - } - int tCount = __IOCPThread->threadCount; - - spdlog::info("Resizing threadpool size to: {}", tCount * 2); - - __IOCPThread->respawnWorker(tCount * 2); - - for (int i = 0; i < tCount; i++) - IOCPThread_->enqueueJob( - [this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, - 0); -#endif - } + void init(utils::ThreadPool* __IOCPThread, SessionProtocol proto); void destruct(); void registerSocket(IOCPPASSINDATA* data); + int recv(IOCPPASSINDATA* data); // data는 한 가지 소켓에 보내는 패킷만 담아야 합니다 int send(SOCKET sock, std::vector* data); - int recv(IOCPPASSINDATA* data); + + int GetRecvedPacketCount(SOCKET sock); private: #ifdef _WIN32 - void iocpWatcher_(utils::ThreadPool* IOCPThread) { - IOCPPASSINDATA* data; - SOCKET sock; - DWORD cbTransfrred; - int jitter = jitterDist_(gen_); - int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, - (PULONG_PTR)&sock, - (LPOVERLAPPED*)&data, 1000 + jitter); - - if (retVal == 0 || cbTransfrred == 0) { - DWORD lasterror = GetLastError(); - if (lasterror == WAIT_TIMEOUT) { - IOCPThread->enqueueJob([this](utils::ThreadPool* th, - std::uint8_t __) { iocpWatcher_(th); }, - 0); - return; - } - data->event = IOCPEVENT::QUIT; - spdlog::debug("Disconnected. [{}]", - (std::string)(data->socket->remoteAddr)); - delete data; - } else { - data->transferredbytes = cbTransfrred; - } - - std::vector buf(16384); // SSL_read최대 반환 크기 - int red_data = 0; - auto queue_list = GetRecvQueue_(data->socket->sock); - if (data->event == IOCPEVENT::READ) { - if (proto_ == SessionProtocol::TLS || proto_ == SessionProtocol::QUIC) { - ::BIO_write(::SSL_get_rbio(data->ssl.get()), data->wsabuf.buf, - cbTransfrred); - - while ((red_data = - ::SSL_read(data->ssl.get(), buf.data(), buf.size())) > 0) { - queue_list->emplace_back(std::make_pair( - std::vector(buf.begin(), buf.begin() + red_data), 0)); - } - } else { - ::memcpy(buf.data(), data->wsabuf.buf, data->transferredbytes); - queue_list->emplace_back(std::make_pair( - std::vector(buf.begin(), - buf.begin() + data->transferredbytes), - 0)); - } - DWORD recvbytes = 0, flags = 0; - - IOCPPASSINDATA* recv_data = new IOCPPASSINDATA(data->bufsize); - recv_data->socket = data->socket; - - delete data; - ::WSARecv(recv_data->socket->sock, &recv_data->wsabuf, 1, &recvbytes, - &flags, &recv_data->overlapped, NULL); - } else { // WRITE 시, 무시한다. - delete data; - } - IOCPThread->enqueueJob( - [this](utils::ThreadPool* th, std::uint8_t __) { iocpWatcher_(th); }, - 0); - } + void iocpWatcher_(utils::ThreadPool* IOCPThread); #elif __linux__ #endif