다중 접속시에 문제가 생김

This commit is contained in:
2025-05-07 20:56:41 +09:00
parent 46abb2b2b9
commit 87cc1c7119
4 changed files with 97 additions and 86 deletions

View File

@@ -85,6 +85,7 @@
"valarray": "cpp", "valarray": "cpp",
"__tree": "cpp", "__tree": "cpp",
"map": "cpp", "map": "cpp",
"iostream": "cpp" "iostream": "cpp",
"any": "cpp"
} }
} }

View File

@@ -8,6 +8,8 @@ void ServerManager::_IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDA
Chattr::Packet pack; Chattr::Packet pack;
int packetSize = data->transferredbytes; int packetSize = data->transferredbytes;
memcpy(pack.serialized, data->wsabuf.buf, data->wsabuf.len);
if (data->event == IOCPEVENT::WRITE && data->transferredbytes >= data->wsabuf.len) { if (data->event == IOCPEVENT::WRITE && data->transferredbytes >= data->wsabuf.len) {
data->event = IOCPEVENT::READ; data->event = IOCPEVENT::READ;
data->wsabuf.len = 1500; data->wsabuf.len = 1500;
@@ -15,8 +17,6 @@ void ServerManager::_IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDA
return; return;
} }
memcpy(pack.serialized, data->wsabuf.buf, data->wsabuf.len);
std::uint16_t packetLength = ::ntohs(pack.__data.packetLength); std::uint16_t packetLength = ::ntohs(pack.__data.packetLength);
if (data->event == IOCPEVENT::READ && data->transferredbytes < packetLength + 8) { if (data->event == IOCPEVENT::READ && data->transferredbytes < packetLength + 8) {
@@ -226,12 +226,11 @@ void ServerManager::processRoomListRequest(RoomListRequestPacket roomListRequest
int packetLength = roomListResponsePacket.__data.packetLength; int packetLength = roomListResponsePacket.__data.packetLength;
Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA;
::memcpy(ptr, data, sizeof(IOCPPASSINDATA));
::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED));
ptr->socket = data->socket;
ptr->recvbytes = ptr->sendbytes = 0; ptr->recvbytes = ptr->sendbytes = 0;
ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.buf = ptr->buf;
ptr->wsabuf.len = packetLength + 8; ptr->wsabuf.len = packetLength + 8;
ptr->IOCPInstance = data->IOCPInstance;
roomListResponsePacket.convToN(); roomListResponsePacket.convToN();
memcpy(ptr->wsabuf.buf, roomListResponsePacket.serialized, packetLength + 8); memcpy(ptr->wsabuf.buf, roomListResponsePacket.serialized, packetLength + 8);
@@ -303,12 +302,11 @@ void ServerManager::processUsersListRequestPacket(UsersListRequestPacket usersLi
int packetLength = usersListResponsePacket.__data.packetLength; int packetLength = usersListResponsePacket.__data.packetLength;
Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA;
::memcpy(ptr, data, sizeof(IOCPPASSINDATA));
::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED));
ptr->socket = data->socket;
ptr->recvbytes = ptr->sendbytes = 0; ptr->recvbytes = ptr->sendbytes = 0;
ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.buf = ptr->buf;
ptr->wsabuf.len = packetLength + 8; ptr->wsabuf.len = packetLength + 8;
ptr->IOCPInstance = data->IOCPInstance;
usersListResponsePacket.convToN(); usersListResponsePacket.convToN();
memcpy(ptr->wsabuf.buf, usersListResponsePacket.serialized, packetLength + 8); memcpy(ptr->wsabuf.buf, usersListResponsePacket.serialized, packetLength + 8);
@@ -351,12 +349,12 @@ void ServerManager::processDataPostPacket(DataPostPacket dataPostPacket, IOCPPAS
for (auto dest : destinationSockets) { for (auto dest : destinationSockets) {
Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA;
::memcpy(ptr, data, sizeof(IOCPPASSINDATA));
::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED));
ptr->socket = dest; ptr->socket = dest;
ptr->recvbytes = ptr->sendbytes = 0; ptr->recvbytes = ptr->sendbytes = 0;
ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.buf = ptr->buf;
ptr->wsabuf.len = packetLength + 6; ptr->wsabuf.len = packetLength + 6;
ptr->IOCPInstance = data->IOCPInstance;
dataPostPacket.convToN(); dataPostPacket.convToN();
memcpy(ptr->wsabuf.buf, dataPostPacket.serialized, packetLength + 6); memcpy(ptr->wsabuf.buf, dataPostPacket.serialized, packetLength + 6);
@@ -399,12 +397,12 @@ void ServerManager::processContinuePacket(ContinuePacket continuePacket, IOCPPAS
for (auto dest : destinationSockets) { for (auto dest : destinationSockets) {
Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA;
::memcpy(ptr, data, sizeof(IOCPPASSINDATA));
::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED));
ptr->socket = dest; ptr->socket = dest;
ptr->recvbytes = ptr->sendbytes = 0; ptr->recvbytes = ptr->sendbytes = 0;
ptr->wsabuf.buf = ptr->buf; ptr->wsabuf.buf = ptr->buf;
ptr->wsabuf.len = packetLength + 6; ptr->wsabuf.len = packetLength + 6;
ptr->IOCPInstance = data->IOCPInstance;
continuePacket.convToN(); continuePacket.convToN();
memcpy(ptr->wsabuf.buf, continuePacket.serialized, packetLength + 6); memcpy(ptr->wsabuf.buf, continuePacket.serialized, packetLength + 6);
@@ -480,8 +478,6 @@ void ServerManager::run() {
while (true) { while (true) {
spdlog::info("Waiting for connection..."); spdlog::info("Waiting for connection...");
listenSock_.accept(clientSock_, clientAddr_); listenSock_.accept(clientSock_, clientAddr_);
bool enable = true;
clientSock_.setsockopt(SOL_SOCKET, SO_KEEPALIVE, (const char*)&enable, sizeof(enable));
Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA;
::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED));
ptr->socket = std::make_shared<TCPSocket>(std::move(clientSock_)); ptr->socket = std::make_shared<TCPSocket>(std::move(clientSock_));

View File

@@ -20,6 +20,7 @@ void IOCP::destruct() {
} }
void IOCP::registerSocket(IOCPPASSINDATA* data) { void IOCP::registerSocket(IOCPPASSINDATA* data) {
data->event = IOCPEVENT::READ;
#ifdef _WIN32 #ifdef _WIN32
HANDLE returnData = ::CreateIoCompletionPort((HANDLE)data->socket->sock, completionPort_, data->socket->sock, 0); HANDLE returnData = ::CreateIoCompletionPort((HANDLE)data->socket->sock, completionPort_, data->socket->sock, 0);
if (returnData == 0) if (returnData == 0)
@@ -30,7 +31,8 @@ void IOCP::registerSocket(IOCPPASSINDATA* data) {
// fcntl(data->socket->sock, F_SETFL, flags); // fcntl(data->socket->sock, F_SETFL, flags);
struct epoll_event ev; struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; ev.events = EPOLLIN | EPOLLONESHOT;
data->sendQueue = std::make_shared<std::queue<IOCPPASSINDATA*>>();
ev.data.ptr = data; ev.data.ptr = data;
int rc = epoll_ctl(epollfd_, EPOLL_CTL_ADD, data->socket->sock, &ev); int rc = epoll_ctl(epollfd_, EPOLL_CTL_ADD, data->socket->sock, &ev);
if (rc < 0) if (rc < 0)
@@ -45,7 +47,7 @@ int IOCP::recv(Chattr::IOCPPASSINDATA* data, int bufferCount) {
return ::WSARecv(data->socket->sock, &data->wsabuf, bufferCount, &recvbytes, &flags, &data->overlapped, NULL); return ::WSARecv(data->socket->sock, &data->wsabuf, bufferCount, &recvbytes, &flags, &data->overlapped, NULL);
#elif __linux__ #elif __linux__
struct epoll_event ev; struct epoll_event ev;
ev.events = EPOLLIN | EPOLLET | EPOLLONESHOT; ev.events = EPOLLIN | EPOLLONESHOT;
ev.data.ptr = data; ev.data.ptr = data;
return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev); return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev);
#endif #endif
@@ -58,11 +60,9 @@ int IOCP::send(Chattr::IOCPPASSINDATA* data, int bufferCount, int __flags, bool
return ::WSASend(data->socket->sock, &data->wsabuf, bufferCount, &sendbytes, __flags, &data->overlapped, NULL); return ::WSASend(data->socket->sock, &data->wsabuf, bufferCount, &sendbytes, __flags, &data->overlapped, NULL);
#elif __linux__ #elif __linux__
struct epoll_event ev; struct epoll_event ev;
if (client) ev.events = EPOLLOUT | EPOLLONESHOT;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLONESHOT;
else
ev.events = EPOLLOUT | EPOLLET | EPOLLONESHOT;
ev.data.ptr = data; ev.data.ptr = data;
data->sendQueue->push(data);
return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev); return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev);
#endif #endif
} }

View File

@@ -6,6 +6,7 @@
#include "Packet/Packet.hpp" #include "Packet/Packet.hpp"
#include <functional> #include <functional>
#include <vector> #include <vector>
#include <queue>
#include "precomp.hpp" #include "precomp.hpp"
@@ -42,6 +43,9 @@ struct IOCPPASSINDATA {
std::uint32_t transferredbytes; std::uint32_t transferredbytes;
WSABUF wsabuf; WSABUF wsabuf;
IOCP* IOCPInstance; IOCP* IOCPInstance;
#ifdef __linux__
std::shared_ptr<std::queue<IOCPPASSINDATA*>> sendQueue;
#endif
}; };
class IOCP { class IOCP {
@@ -66,100 +70,110 @@ public:
}; };
#elif __linux__ #elif __linux__
static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) { static void socketReader(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr;
pthread_t tid = pthread_self(); pthread_t tid = pthread_self();
if (iocpData == nullptr) { if (event.data.ptr == nullptr) {
spdlog::error("invalid call on {}", tid); spdlog::error("invalid call on {}", tid);
return; return;
} }
IOCPPASSINDATA* rootIocpData = (IOCPPASSINDATA*)event.data.ptr;
std::lock_guard<std::mutex> lock(iocpData->socket->readMutex); std::lock_guard<std::mutex> lock(rootIocpData->socket->readMutex);
char peekBuffer[1];
while (rootIocpData->socket->recv(peekBuffer, 1, MSG_PEEK)) {
Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA;
::memcpy(ptr, rootIocpData, sizeof(IOCPPASSINDATA));
::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED));
ptr->recvbytes = ptr->sendbytes = 0;
ptr->wsabuf.buf = ptr->buf;
ptr->wsabuf.len = 1500;
spdlog::trace("reading on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); int redSize = 0;
int headerSize = 8;
int totalRedSize = 0;
int redSize = 0; while (totalRedSize < headerSize) {
int headerSize = 8; redSize = ptr->socket->recv(ptr->buf + totalRedSize, headerSize - totalRedSize, 0);
int totalRedSize = 0;
if (redSize == SOCKET_ERROR) {
while (totalRedSize < headerSize) { spdlog::error("recv() [{}]", strerror(errno));
redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, headerSize - totalRedSize, 0); ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL);
delete ptr;
if (redSize == SOCKET_ERROR) { return;
spdlog::error("recv() [{}]", strerror(errno)); }
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); else if (redSize == 0) {
delete iocpData; spdlog::debug("Client disconnected. [{}]", (std::string)ptr->socket->remoteAddr);
return; ::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL);
delete ptr;
return;
}
totalRedSize += redSize;
} }
else if (redSize == 0) {
spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); Packet packet;
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); ::memcpy(packet.serialized, ptr->buf, headerSize);
delete iocpData;
return; redSize = 0;
int dataLength = ntohs(packet.__data.packetLength);
while (totalRedSize < dataLength + headerSize) {
redSize = ptr->socket->recv(ptr->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0);
if (redSize == SOCKET_ERROR) {
spdlog::error("recv() [{}]", strerror(errno));
::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL);
delete ptr;
return;
}
else if (redSize == 0) {
spdlog::debug("Client disconnected. [{}]", (std::string)ptr->socket->remoteAddr);
::epoll_ctl(epollfd, EPOLL_CTL_DEL, ptr->socket->sock, NULL);
delete ptr;
return;
}
totalRedSize += redSize;
} }
totalRedSize += redSize; ptr->transferredbytes = totalRedSize;
threadPool->enqueueJob(callback, ptr);
} }
Packet packet;
::memcpy(packet.serialized, iocpData->buf, headerSize);
redSize = 0;
int dataLength = ntohs(packet.__data.packetLength);
while (totalRedSize < dataLength + headerSize) {
redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0);
if (redSize == SOCKET_ERROR) {
spdlog::error("recv() [{}]", strerror(errno));
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
delete iocpData;
return;
}
else if (redSize == 0) {
spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr);
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL);
delete iocpData;
return;
}
totalRedSize += redSize;
}
iocpData->transferredbytes = totalRedSize;
threadPool->enqueueJob(callback, iocpData);
}; };
static void socketWriter(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) { static void socketWriter(ThreadPool* threadPool, epoll_event event, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr;
pthread_t tid = pthread_self(); pthread_t tid = pthread_self();
if (iocpData == nullptr) { if (event.data.ptr == nullptr) {
spdlog::error("invalid call on {}", tid); spdlog::error("invalid call on {}", tid);
return; return;
} }
IOCPPASSINDATA* rootIocpData = (IOCPPASSINDATA*)event.data.ptr;
std::lock_guard<std::mutex> lock(iocpData->socket->writeMutex); std::lock_guard<std::mutex> lock(rootIocpData->socket->writeMutex);
auto queue = rootIocpData->sendQueue;
while (!queue->empty()) {
IOCPPASSINDATA* data = queue->front();
queue->pop();
spdlog::trace("Writing on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); int packetSize = data->wsabuf.len;
int totalSentSize = 0;
int sentSize = 0;
int packetSize = iocpData->wsabuf.len; spdlog::trace("Sending to: [{}]", (std::string)data->socket->remoteAddr);
int totalSentSize = 0;
int sentSize = 0;
spdlog::trace("Sending to: [{}]", (std::string)iocpData->socket->remoteAddr); while (totalSentSize < packetSize) {
sentSize = data->socket->send(data->buf + totalSentSize, packetSize - totalSentSize, 0);
while (totalSentSize < packetSize) { if (sentSize == SOCKET_ERROR) {
sentSize = iocpData->socket->send(iocpData->buf + totalSentSize, packetSize - totalSentSize, 0); spdlog::error("send() [{}]", strerror(errno));
::epoll_ctl(epollfd, EPOLL_CTL_DEL, data->socket->sock, NULL);
if (sentSize == SOCKET_ERROR) { delete data;
spdlog::error("send() [{}]", strerror(errno)); return;
::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); }
delete iocpData; totalSentSize += sentSize;
return;
} }
totalSentSize += sentSize; data->transferredbytes = totalSentSize;
threadPool->enqueueJob(callback, data);
} }
iocpData->transferredbytes = totalSentSize;
threadPool->enqueueJob(callback, iocpData);
}; };
static void iocpWatcher(ThreadPool* threadPool, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) { static void iocpWatcher(ThreadPool* threadPool, int epollfd, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
struct epoll_event events[FD_SETSIZE]; struct epoll_event events[FD_SETSIZE];