일단 iocp는 완성한 듯 윈도우에서만 되면 될 듯

This commit is contained in:
2025-05-02 19:30:37 +09:00
parent 7fabc98456
commit 181d30ff28
10 changed files with 147 additions and 53 deletions

View File

@@ -4,6 +4,7 @@
#include "Socket/TCPSocket.hpp"
#include "Socket/Log.hpp"
#include <functional>
#include "precomp.hpp"
#ifndef _WIN32
@@ -52,21 +53,81 @@ public:
threadPool->enqueueJob(iocpWather, completionPort_, callback);
};
#elif __linux__
static void iocpWather(ThreadPool* threadPool, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
// DWORD tid = GetCurrentThreadId();
// spdlog::debug("Waiting IO to complete on TID: {}.", tid);
// IOCPPASSINDATA* data;
// SOCKET sock;
// DWORD cbTransfrred;
// int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, (PULONG_PTR)&sock, (LPOVERLAPPED*)&data, INFINITE);
// if (retVal == 0 || cbTransfrred == 0) {
// spdlog::info("Client disconnected. [{}]", (std::string)(data->socket.remoteAddr));
// delete data;
// threadPool->enqueueJob(iocpWather, completionPort_, callback);
// return;
// }
// threadPool->enqueueJob(callback, data);
// threadPool->enqueueJob(iocpWather, completionPort_, callback);
static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr;
pthread_t tid = pthread_self();
if (iocpData == nullptr) {
spdlog::error("invalid call on {}", tid);
return;
}
spdlog::info("reading on tid: {} [{}]", tid, (std::string)iocpData->socket.remoteAddr);
int redSize = 0;
int packetSize = iocpData->wsabuf.len;
int totalRedSize = 0;
while (totalRedSize < packetSize) {
redSize = iocpData->socket.recv(iocpData->buf, packetSize - totalRedSize, 0);
if (redSize <= 0) {
spdlog::info("Client disconnected. [{}]", (std::string)iocpData->socket.remoteAddr);
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket.sock, NULL);
delete iocpData;
return;
}
totalRedSize += redSize;
}
iocpData->sendbytes = packetSize - totalRedSize;
iocpData->recvbytes = totalRedSize;
threadPool->enqueueJob(callback, iocpData);
};
static void socketWriter(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr;
int packetSize = iocpData->wsabuf.len;
int totalSentSize = 0;
int sentSize = 0;
spdlog::info("Sending to: [{}]", (std::string)iocpData->socket.remoteAddr);
while (totalSentSize < packetSize) {
sentSize = iocpData->socket.send(iocpData->buf, packetSize - totalSentSize, 0);
if (sentSize <= 0) {
spdlog::info("Client disconnected. [{}]", (std::string)iocpData->socket.remoteAddr);
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket.sock, NULL);
delete iocpData;
return;
}
totalSentSize += sentSize;
}
iocpData->recvbytes = packetSize - totalSentSize;
iocpData->sendbytes = totalSentSize;
threadPool->enqueueJob(callback, iocpData);
};
static void iocpWatcher(ThreadPool* threadPool, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
struct epoll_event events[FD_SETSIZE];
pthread_t tid = pthread_self();
spdlog::debug("epoll waiting on {}", tid);
int nready = ::epoll_wait(epollfd, events, FD_SETSIZE, -1);
for (int i=0; i<nready; i++) {
struct epoll_event current_event = events[i];
if (current_event.events & EPOLLIN) {
std::function<void(ThreadPool*, IOCPPASSINDATA*)> task(callback);
threadPool->enqueueJob(socketReader, current_event, epollfd, task);
}
else if (current_event.events & EPOLLOUT) {
std::function<void(ThreadPool*, IOCPPASSINDATA*)> task(callback);
threadPool->enqueueJob(socketWriter, current_event, epollfd, task);
}
}
threadPool->enqueueJob(iocpWatcher, epollfd, callback);
};
#endif
@@ -78,26 +139,30 @@ public:
if (completionPort_ == NULL)
log::critical("CreateIoCompletionPort()");
#elif __linux__
epollfd_ = ::epoll_create(1);
#endif
auto boundFunc = [callback = std::move(callback)](ThreadPool* __IOCPThread, IOCPPASSINDATA* data) mutable {
callback(__IOCPThread, data);
};
};
#ifdef _WIN32
int tCount = __IOCPThread->threadCount;
spdlog::info("Resizing threadpool size to: {}", tCount * 2);
__IOCPThread->respawnWorker(tCount * 2);
spdlog::info("Set IOCP Worker count to: {}", tCount);
for (int i = 0; i < tCount; i++) {
std::function<void(ThreadPool*, IOCPPASSINDATA*)> task(boundFunc);
__IOCPThread->enqueueJob(iocpWather, task);
__IOCPThread->enqueueJob(iocpWather, completionPort_, task);
}
#elif __linux__
spdlog::info("Spawning 1 Epoll Waiter...");
__IOCPThread->enqueueJob(iocpWatcher, epollfd_, boundFunc);
#endif
}
void registerSocket(SOCKET sock);
void registerSocket(Chattr::IOCPPASSINDATA* data);
int recv(Chattr::IOCPPASSINDATA* data);
int send(Chattr::IOCPPASSINDATA* data, int __flags);
@@ -109,7 +174,7 @@ private:
#ifdef _WIN32
HANDLE completionPort_ = INVALID_HANDLE_VALUE;
#elif __linux__
int epollfd = -1;
int epollfd_ = -1;
#endif
};