도저히 여러 패킷 동시에 보내기를 할 수가 없다...

This commit is contained in:
2025-05-04 03:26:02 +09:00
parent 016d923d64
commit 63428ebf4d
4 changed files with 65 additions and 46 deletions

View File

@@ -16,7 +16,7 @@ public:
void init(_Callable _IOCPClient) { void init(_Callable _IOCPClient) {
auto config = ConfigManager::load(); auto config = ConfigManager::load();
log::setDefaultLogger(config.logLevel, config.logFileName, config.logfileSize, config.logfileCount); log::setDefaultLogger(config.logLevel, config.logFileName, config.logfileSize, config.logfileCount);
threadPool_.init(0); threadPool_.init(2);
iocp_.init(&threadPool_, _IOCPClient); iocp_.init(&threadPool_, _IOCPClient);
struct Address serveraddr; struct Address serveraddr;

View File

@@ -182,9 +182,9 @@ void ServerManager::processLoginRequestPacket(LoginRequestPacket loginRequestPac
::memcpy(loginResponsePacket.__data.yourId, &yourId, sizeof(Snowflake)); ::memcpy(loginResponsePacket.__data.yourId, &yourId, sizeof(Snowflake));
loginResponsePacket.convToN(); loginResponsePacket.convToN();
memcpy(data->wsabuf.buf, loginResponsePacket.serialized, 16); memcpy(data->wsabuf.buf, loginResponsePacket.serialized, 18);
data->sendbytes = 16; data->sendbytes = 18;
data->wsabuf.len = 16; data->wsabuf.len = 18;
data->IOCPInstance->send(data, 1, 0); data->IOCPInstance->send(data, 1, 0);
} }

View File

@@ -5,6 +5,20 @@
namespace Chattr { namespace Chattr {
IOCP::IOCP() {
}
IOCP::~IOCP() {
destruct();
}
void IOCP::destruct() {
#ifdef __linux__
close(epollfd_);
#endif
}
void IOCP::registerSocket(Chattr::IOCPPASSINDATA* data) { void IOCP::registerSocket(Chattr::IOCPPASSINDATA* data) {
#ifdef _WIN32 #ifdef _WIN32
HANDLE returnData = ::CreateIoCompletionPort((HANDLE)data->socket->sock, completionPort_, data->socket->sock, 0); HANDLE returnData = ::CreateIoCompletionPort((HANDLE)data->socket->sock, completionPort_, data->socket->sock, 0);

View File

@@ -9,7 +9,7 @@
#include "precomp.hpp" #include "precomp.hpp"
#ifndef _WIN32 #ifdef __linux__
typedef struct _OVERLAPPED { typedef struct _OVERLAPPED {
char dummy; char dummy;
@@ -67,7 +67,7 @@ public:
#elif __linux__ #elif __linux__
static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) { static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr; IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr;
pthread_t tid = pthread_self(); pthread_t tid = pthread_self();
if (iocpData == nullptr) { if (iocpData == nullptr) {
@@ -84,25 +84,25 @@ public:
int totalRedSize = 0; int totalRedSize = 0;
while (totalRedSize < headerSize) { while (totalRedSize < headerSize) {
redSize = iocpData->socket->recv(iocpData->buf, headerSize - totalRedSize, 0); redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, headerSize - totalRedSize, 0);
if (redSize <= 0) { if (redSize == SOCKET_ERROR) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { spdlog::error("recv() [{}]", strerror(errno));
spdlog::trace("{}", strerror(errno)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
break; delete iocpData;
} else { return;
spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); }
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); else if (redSize == 0) {
delete iocpData; spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr);
return; ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
} delete iocpData;
return;
} }
totalRedSize += redSize; totalRedSize += redSize;
} }
Packet packet; Packet packet;
::memcpy(&packet.serialized, iocpData->buf, headerSize); ::memcpy(packet.serialized, iocpData->buf, headerSize);
redSize = 0; redSize = 0;
int dataLength = ntohs(packet.__data.packetLength); int dataLength = ntohs(packet.__data.packetLength);
@@ -110,17 +110,17 @@ public:
while (totalRedSize < dataLength + headerSize) { while (totalRedSize < dataLength + headerSize) {
redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0); redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0);
if (redSize <= 0) { if (redSize == SOCKET_ERROR) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { spdlog::error("recv() [{}]", strerror(errno));
spdlog::trace("{}", strerror(errno)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
break; delete iocpData;
} else { return;
spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); }
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); else if (redSize == 0) {
delete iocpData; spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr);
return; ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
} delete iocpData;
return;
} }
totalRedSize += redSize; totalRedSize += redSize;
} }
@@ -141,25 +141,20 @@ public:
spdlog::trace("Writing on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); spdlog::trace("Writing on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr);
int packetSize = iocpData->transferredbytes; int packetSize = iocpData->wsabuf.len;
int totalSentSize = 0; int totalSentSize = 0;
int sentSize = 0; int sentSize = 0;
spdlog::trace("Sending to: [{}]", (std::string)iocpData->socket->remoteAddr); spdlog::trace("Sending to: [{}]", (std::string)iocpData->socket->remoteAddr);
while (totalSentSize < packetSize) { while (totalSentSize < packetSize) {
sentSize = iocpData->socket->send(iocpData->buf, packetSize - totalSentSize, 0); sentSize = iocpData->socket->send(iocpData->buf + totalSentSize, packetSize - totalSentSize, 0);
if (sentSize <= 0) { if (sentSize == SOCKET_ERROR) {
if (errno == EAGAIN || errno == EWOULDBLOCK) { spdlog::error("send() [{}]", strerror(errno));
spdlog::trace("{}", strerror(errno)); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
break; delete iocpData;
} else { return;
spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr);
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
delete iocpData;
return;
}
} }
totalSentSize += sentSize; totalSentSize += sentSize;
} }
@@ -184,11 +179,16 @@ public:
std::function<void(ThreadPool*, IOCPPASSINDATA*)> task(callback); std::function<void(ThreadPool*, IOCPPASSINDATA*)> task(callback);
threadPool->enqueueJob(socketWriter, current_event, epollfd, task); threadPool->enqueueJob(socketWriter, current_event, epollfd, task);
} }
if (--nready <= 0)
break;
} }
threadPool->enqueueJob(iocpWatcher, epollfd, callback); threadPool->enqueueJob(iocpWatcher, epollfd, callback);
}; };
#endif #endif
IOCP();
~IOCP();
template<typename _Callable> template<typename _Callable>
void init(ThreadPool* __IOCPThread, _Callable&& callback) { void init(ThreadPool* __IOCPThread, _Callable&& callback) {
IOCPThread_ = __IOCPThread; IOCPThread_ = __IOCPThread;
@@ -215,24 +215,29 @@ public:
__IOCPThread->enqueueJob(iocpWather, completionPort_, task); __IOCPThread->enqueueJob(iocpWather, completionPort_, task);
} }
#elif __linux__ #elif __linux__
__IOCPThread->respawnWorker(__IOCPThread->threadCount + 1);
spdlog::info("Spawning 1 Epoll Waiter..."); spdlog::info("Spawning 1 Epoll Waiter...");
__IOCPThread->enqueueJob(iocpWatcher, epollfd_, boundFunc); __IOCPThread->enqueueJob(iocpWatcher, epollfd_, boundFunc);
#endif #endif
} }
void registerSocket(Chattr::IOCPPASSINDATA* data); void destruct();
int recv(Chattr::IOCPPASSINDATA* data, int bufferCount); void registerSocket(IOCPPASSINDATA* data);
int send(Chattr::IOCPPASSINDATA* data, int bufferCount, int __flags);
int recv(IOCPPASSINDATA* data, int bufferCount);
int send(IOCPPASSINDATA* data, int bufferCount, int __flags);
private: private:
struct Chattr::WSAManager wsaManager; struct WSAManager wsaManager;
ThreadPool* IOCPThread_; ThreadPool* IOCPThread_;
#ifdef _WIN32 #ifdef _WIN32
HANDLE completionPort_ = INVALID_HANDLE_VALUE; HANDLE completionPort_ = INVALID_HANDLE_VALUE;
#elif __linux__ #elif __linux__
int epollfd_ = -1; int epollfd_ = -1;
std::unordered_map<std::shared_ptr<TCPSocket>, std::mutex> writeMutex;
std::unordered_map<std::shared_ptr<TCPSocket>, std::queue<IOCPPASSINDATA*>> writeBuffer;
#endif #endif
}; };