From 6ad787eed76e30b714f5997e91f1ffaf03896d7c Mon Sep 17 00:00:00 2001 From: HappyTanuki Date: Sat, 3 May 2025 17:58:46 +0900 Subject: [PATCH] =?UTF-8?q?=EB=91=98=20=EB=8B=A4=20=EB=8F=99=EC=8B=9C=20?= =?UTF-8?q?=EB=94=94=EB=B2=84=EA=B9=85=EC=9D=84=20=EC=9C=84=ED=95=B4=20pus?= =?UTF-8?q?h?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Client/src/client.cpp | 68 +++++++- .../include/ServerManager/ServerManager.hpp | 5 +- Server/src/ServerManager/ServerManager.cpp | 155 +++++++++++------- impl/Socket/IOCP.cpp | 12 +- impl/Socket/Socket.cpp | 2 + impl/Utils/GenerateID.cpp | 32 ++++ impl/Utils/ThreadPool.cpp | 6 +- include/Packet/Packet.hpp | 30 +++- include/Socket/IOCP.hpp | 87 +++++++--- include/Socket/Socket.hpp | 3 + include/Utils/Snowflake.hpp | 2 + 11 files changed, 305 insertions(+), 97 deletions(-) create mode 100644 impl/Utils/GenerateID.cpp diff --git a/Client/src/client.cpp b/Client/src/client.cpp index dd9aa1e..b173c3a 100644 --- a/Client/src/client.cpp +++ b/Client/src/client.cpp @@ -2,6 +2,7 @@ #include "Socket/Address.hpp" #include "Socket/Log.hpp" #include "Packet/Packet.hpp" +#include "Utils/Snowflake.hpp" #include "precomp.hpp" int main() { @@ -10,18 +11,79 @@ int main() { Chattr::Address serveraddr(AF_INET6, "::1", 9011); - if (sock.connect(serveraddr) == INVALID_SOCKET); + if (sock.connect(serveraddr) == INVALID_SOCKET) { + spdlog::error("{}", strerror(errno)); return 0; + } spdlog::info("Connection established from {}", (std::string)serveraddr); + Chattr::Snowflake myId; + + Chattr::LoginRequestPacket loginRequestPacket; + loginRequestPacket.__data.packetType = Chattr::PacketCategory::PACKET_REQUEST; + loginRequestPacket.__data.requestType = Chattr::RequestType::LOGIN; + loginRequestPacket.__data.dataType = Chattr::DataType::BINARY; + loginRequestPacket.__data.packetLength = 14; + memcpy(loginRequestPacket.__data.data, "Hello, World!", 14); + loginRequestPacket.convToN(); + sock.send(loginRequestPacket.serialized, 8 + 14, 0); + + Chattr::LoginResponsePacket loginResponsePacket; + sock.recv(loginResponsePacket.serialized, 18, 0); + loginResponsePacket.convToH(); + ::memcpy(&myId, loginResponsePacket.__data.yourId, sizeof(Chattr::Snowflake)); + + Chattr::UsersListRequestPacket usersListRequestPacket; + usersListRequestPacket.__data.packetType = Chattr::PacketCategory::PACKET_REQUEST; + usersListRequestPacket.__data.requestType = Chattr::RequestType::USERS_LIST; + usersListRequestPacket.__data.dataType = Chattr::DataType::TEXT; + usersListRequestPacket.__data.packetLength = 0; + usersListRequestPacket.convToN(); + sock.send(usersListRequestPacket.serialized, 8, 0); + + std::vector> users; + + Chattr::UsersListResponsePacket usersListResponsePacket; + sock.recv(usersListResponsePacket.serialized, 8, 0); + std::uint16_t dataLength = ::ntohs(usersListResponsePacket.__data.packetLength); + sock.recv(usersListResponsePacket.serialized + 8, dataLength, 0); + usersListRequestPacket.convToH(); + + int usersCount = usersListResponsePacket.__data.usersCount; + users.reserve(usersCount); + Chattr::Snowflake userId; + ::memcpy(&userId.snowflake, usersListResponsePacket.__data.userId, sizeof(Chattr::Snowflake)); + users.emplace_back( + userId, + std::string((char*)usersListResponsePacket.__data.name, usersListResponsePacket.__data.packetLength - 14) + ); + + for (int i = 0; i < usersCount - 1; i++) { + sock.recv(usersListResponsePacket.serialized, 8, 0); + std::uint16_t dataLength = ::ntohs(usersListResponsePacket.__data.packetLength); + sock.recv(usersListResponsePacket.serialized + 8, dataLength, 0); + usersListRequestPacket.convToH(); + + ::memcpy(&userId.snowflake, usersListResponsePacket.__data.userId, sizeof(Chattr::Snowflake)); + users.emplace_back( + userId, + std::string((char*)usersListResponsePacket.__data.name, usersListResponsePacket.__data.packetLength - 14) + ); + } + Chattr::DataPostPacket dataPostPacket; dataPostPacket.__data.packetType = Chattr::PacketCategory::PACKET_POST; dataPostPacket.__data.requestType = Chattr::RequestType::DATA; dataPostPacket.__data.dataType = Chattr::DataType::TEXT; - dataPostPacket.__data.packetLength = 14; + dataPostPacket.__data.packetLength = 14 + 8; + for (auto user : users) + if (user.first != myId) { + ::memcpy(dataPostPacket.__data.destId, &user.first.snowflake, sizeof(Chattr::Snowflake)); + break; + } memcpy(dataPostPacket.__data.data, "Hello, World!", 14); dataPostPacket.convToN(); - sock.send(&dataPostPacket.serialized, 8 + 14, 0); + sock.send(&dataPostPacket.serialized, 6 + 14 + 8, 0); Chattr::ResponsePacket packet; sock.recv(&packet.serialized, 10, 0); diff --git a/Server/include/ServerManager/ServerManager.hpp b/Server/include/ServerManager/ServerManager.hpp index 97b2b74..217f2ef 100644 --- a/Server/include/ServerManager/ServerManager.hpp +++ b/Server/include/ServerManager/ServerManager.hpp @@ -51,8 +51,6 @@ public: void joinRoom(Snowflake UID, Snowflake RID); void exitRoom(Snowflake UID, Snowflake RID); - Snowflake generateID(); - void run(); private: ThreadPool threadPool_; @@ -64,14 +62,13 @@ private: struct Address clientAddr_; std::mutex resourceMutex_; - std::mutex snowflakeGenerateMutex_; std::unordered_map roomNames_; std::unordered_map>> rooms_; std::unordered_map> UID2userSocket_; - std::unordered_map userSocket2UID_; + std::unordered_map, Snowflake> userSocket2UID_; std::unordered_map userNames_; diff --git a/Server/src/ServerManager/ServerManager.cpp b/Server/src/ServerManager/ServerManager.cpp index 7919fc7..7c507f1 100644 --- a/Server/src/ServerManager/ServerManager.cpp +++ b/Server/src/ServerManager/ServerManager.cpp @@ -6,29 +6,52 @@ namespace Chattr { void ServerManager::_IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDATA* data) { Chattr::Packet pack; - int packetSize = data->transfrredbytes; + int packetSize = data->transferredbytes; if (data->recvbytes == 0) { - data->recvbytes = data->transfrredbytes; - data->transfrredbytes = 0; + data->recvbytes = data->transferredbytes; + data->transferredbytes = 0; } - else if (data->transfrredbytes <= data->sendbytes) { + else if (data->transferredbytes <= data->sendbytes) { data->IOCPInstance->recv(data, 1); return; } memcpy(pack.serialized, data->wsabuf.buf, data->wsabuf.len); - pack.convToH(); - - std::string recvString; - bool packetError = false; DataPostPacket dataPostPacket; ResponsePacket responsePacket; - LoginRequestPacket loginRequestPacket; + + pack.convToH(); + + std::uint16_t packetLength = pack.__data.packetLength; switch (packetParser(pack)) { - case PacketSet::LOGINREQUEST: + case PacketSet::LOGINREQUEST: { + pack.convToN(); + LoginRequestPacket loginRequestPacket; + std::memcpy(&loginRequestPacket.serialized, &pack, 8 + pack.__data.packetLength); + loginRequestPacket.convToH(); + + std::string userName( + (char*)loginRequestPacket.__data.data, + loginRequestPacket.__data.packetLength); + registerUser(userName, data->socket); + + LoginResponsePacket loginResponsePacket; + loginResponsePacket.__data.packetType = Chattr::PacketCategory::PACKET_RESPONSE; + loginResponsePacket.__data.requestType = Chattr::RequestType::LOGIN; + loginResponsePacket.__data.dataType = Chattr::DataType::TEXT; + loginResponsePacket.__data.packetLength = sizeof(Chattr::ResponseStatusCode) + sizeof(Snowflake); + loginResponsePacket.__data.responseStatusCode = Chattr::ResponseStatusCode::OK; + ::memcpy(loginResponsePacket.__data.yourId, &userSocket2UID_[data->socket], sizeof(Snowflake)); + + loginResponsePacket.convToN(); + memcpy(data->wsabuf.buf, loginResponsePacket.serialized, 16); + data->sendbytes = 16; + data->transferredbytes = 16; + data->IOCPInstance->send(data, 1, 0); + } break; case PacketSet::ROOMCREATEREQUEST: break; @@ -40,23 +63,60 @@ void ServerManager::_IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDA break; case PacketSet::USERSLISTREQUEST: break; - case PacketSet::DATAPOSTTEXT: - std::memcpy(&dataPostPacket.serialized, &pack, 1500); - dataPostPacket.__data.packetLength = (dataPostPacket.__data.packetLength > 1487) ? 1487 : dataPostPacket.__data.packetLength; - recvString = std::string((char*)dataPostPacket.__data.data, dataPostPacket.__data.packetLength - (sizeof(std::uint16_t) * 4)); - spdlog::info("Received [{}] from : [{}]", recvString, (std::string)data->socket->remoteAddr); + case PacketSet::DATAPOSTTEXT: { + pack.convToN(); + std::memcpy(&dataPostPacket.serialized, &pack, 8 + pack.__data.packetLength); + dataPostPacket.convToH(); - responsePacket.__data.packetType = Chattr::PacketCategory::PACKET_RESPONSE; - responsePacket.__data.requestType = Chattr::RequestType::DATA; - responsePacket.__data.dataType = Chattr::DataType::TEXT; - responsePacket.__data.packetLength = sizeof(Chattr::ResponseStatusCode); - responsePacket.__data.responseStatusCode = Chattr::ResponseStatusCode::BAD_REQUEST; - responsePacket.convToN(); - memcpy(data->wsabuf.buf, responsePacket.serialized, responsePacket.__data.packetLength + 8); + Snowflake destID = {}; - // data->sendbytes = responsePacket.__data.packetLength + 5; - data->sendbytes = 1500; - data->IOCPInstance->send(data, 1, 0); + ::memcpy(&destID.snowflake, dataPostPacket.__data.destId, sizeof(Snowflake)); + + std::vector> destinationSockets; + + if (userNames_.find(destID) != userNames_.end()) + destinationSockets.push_back(UID2userSocket_[destID]); + else + for (auto user : rooms_[destID]) + destinationSockets.push_back(user.second); + + spdlog::info("Received [{}] from : [{}] to : [{}]", + std::string((char*)dataPostPacket.__data.data, dataPostPacket.__data.packetLength - (sizeof(std::uint16_t) * 5)), + (std::string)data->socket->remoteAddr, + destID.snowflake); + + responsePacket.__data.packetType = Chattr::PacketCategory::PACKET_RESPONSE; + responsePacket.__data.requestType = Chattr::RequestType::DATA; + responsePacket.__data.dataType = Chattr::DataType::TEXT; + responsePacket.__data.packetLength = sizeof(Chattr::ResponseStatusCode); + responsePacket.__data.responseStatusCode = Chattr::ResponseStatusCode::OK; + + responsePacket.convToN(); + memcpy(data->wsabuf.buf, responsePacket.serialized, 10); + data->sendbytes = 10; + data->transferredbytes = 10; + data->IOCPInstance->send(data, 1, 0); + + for (auto dest : destinationSockets) { + dataPostPacket.__data.packetType = Chattr::PacketCategory::PACKET_POST; + dataPostPacket.__data.requestType = Chattr::RequestType::DATA; + dataPostPacket.__data.dataType = Chattr::DataType::TEXT; + + Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA; + ::memset(&ptr->overlapped, 0, sizeof(OVERLAPPED)); + ptr->socket = dest; + ptr->recvbytes = ptr->sendbytes = 0; + ptr->wsabuf.buf = ptr->buf; + ptr->wsabuf.len = 1500; + ptr->IOCPInstance = data->IOCPInstance; + + dataPostPacket.convToN(); + memcpy(ptr->wsabuf.buf, dataPostPacket.serialized, packetLength + 6); + data->sendbytes = packetLength + 6; + data->transferredbytes = packetLength + 6; + data->IOCPInstance->send(ptr, 1, 0); + } + } break; case PacketSet::DATAPOSTBINARY: break; @@ -69,18 +129,18 @@ void ServerManager::_IOCPClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDA responsePacket.__data.dataType = Chattr::DataType::TEXT; responsePacket.__data.packetLength = sizeof(Chattr::ResponseStatusCode); responsePacket.__data.responseStatusCode = Chattr::ResponseStatusCode::BAD_REQUEST; + responsePacket.convToN(); - memcpy(data->wsabuf.buf, responsePacket.serialized, responsePacket.__data.packetLength + 8); - // data->sendbytes = responsePacket.__data.packetLength + 5; - data->sendbytes = responsePacket.__data.packetLength + 8; - data->transfrredbytes = responsePacket.__data.packetLength + 8; + memcpy(data->wsabuf.buf, responsePacket.serialized, 10); + data->sendbytes = 10; + data->transferredbytes = 10; data->IOCPInstance->send(data, 1, 0); break; } } PacketSet ServerManager::packetParser(Packet Packet) { - if (Packet.__data.packetLength < 0 || Packet.__data.packetLength > 1500) + if (Packet.__data.packetLength < 0 || Packet.__data.packetLength > 1492) return PacketSet::INVALID; switch (Packet.__data.packetType) { @@ -138,14 +198,16 @@ PacketSet ServerManager::packetParser(Packet Packet) { default: return PacketSet::INVALID; } + + return PacketSet::INVALID; } void ServerManager::registerUser(std::string userName, std::shared_ptr sock) { std::lock_guard lock(resourceMutex_); - Snowflake UID = generateID(); + Snowflake UID = GenerateID(); userNames_[UID] = userName; UID2userSocket_[UID] = sock; - userSocket2UID_[sock.get()] = UID; + userSocket2UID_[sock] = UID; } void ServerManager::deleteUser(Snowflake UID) { @@ -153,7 +215,7 @@ void ServerManager::deleteUser(Snowflake UID) { userNames_.erase(UID); std::shared_ptr sock = UID2userSocket_[UID]; UID2userSocket_.erase(UID); - userSocket2UID_.erase(sock.get()); + userSocket2UID_.erase(sock); } std::vector> ServerManager::getUserList() { @@ -169,7 +231,7 @@ std::vector> ServerManager::getUserList() { void ServerManager::createRoom(std::string roomName) { std::lock_guard lock(resourceMutex_); - Snowflake RID = generateID(); + Snowflake RID = GenerateID(); roomNames_[RID] = roomName; rooms_[RID] = std::unordered_map>(); } @@ -202,31 +264,6 @@ void ServerManager::exitRoom(Snowflake UID, Snowflake RID) { rooms_[RID].erase(UID); } -static struct _EPOCH { - _EPOCH() { - EPOCH = std::chrono::system_clock::now(); - } - std::chrono::system_clock::time_point EPOCH; -} __EPOCH__; - -Snowflake ServerManager::generateID() { - std::lock_guard lock(snowflakeGenerateMutex_); -#ifdef _WIN32 - DWORD tid = GetCurrentThreadId(); -#elif __linux__ - pthread_t tid = pthread_self(); -#endif - static int sequence = 0; - Snowflake id = {}; - - auto timestamp = std::chrono::duration_cast(std::chrono::system_clock::now() - __EPOCH__.EPOCH); - id.timestamp = timestamp.count(); - id.instance = tid; - id.sequence = sequence++; - - return id; -} - void ServerManager::run() { while (true) { spdlog::info("Waiting for connection..."); diff --git a/impl/Socket/IOCP.cpp b/impl/Socket/IOCP.cpp index c04cf42..7f3c9fb 100644 --- a/impl/Socket/IOCP.cpp +++ b/impl/Socket/IOCP.cpp @@ -11,14 +11,14 @@ void IOCP::registerSocket(Chattr::IOCPPASSINDATA* data) { if (returnData == 0) completionPort_ = returnData; #elif __linux__ - // int flags = ::fcntl(data->socket.sock, F_GETFL); - // flags |= O_NONBLOCK; - // fcntl(data->socket.sock, F_SETFL, flags); + int flags = ::fcntl(data->socket->sock, F_GETFL); + flags |= O_NONBLOCK; + fcntl(data->socket->sock, F_SETFL, flags); struct epoll_event ev; ev.events = EPOLLIN | EPOLLONESHOT; ev.data.ptr = data; - int rc = epoll_ctl(epollfd_, EPOLL_CTL_ADD, data->socket.sock, &ev); + int rc = epoll_ctl(epollfd_, EPOLL_CTL_ADD, data->socket->sock, &ev); if (rc < 0) log::critical("epoll_ctl()"); #endif @@ -32,7 +32,7 @@ int IOCP::recv(Chattr::IOCPPASSINDATA* data, int bufferCount) { struct epoll_event ev; ev.events = EPOLLIN | EPOLLONESHOT; ev.data.ptr = data; - return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket.sock, &ev); + return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev); #endif } @@ -44,7 +44,7 @@ int IOCP::send(Chattr::IOCPPASSINDATA* data, int bufferCount, int __flags) { struct epoll_event ev; ev.events = EPOLLOUT | EPOLLONESHOT; ev.data.ptr = data; - return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket.sock, &ev); + return ::epoll_ctl(epollfd_, EPOLL_CTL_MOD, data->socket->sock, &ev); #endif } diff --git a/impl/Socket/Socket.cpp b/impl/Socket/Socket.cpp index d7206a0..d9575c9 100644 --- a/impl/Socket/Socket.cpp +++ b/impl/Socket/Socket.cpp @@ -67,6 +67,7 @@ int Socket::bind(Address __addr) { } int Socket::recvfrom(void *__restrict __buf, size_t __n, int __flags, struct Address& __addr) { + std::lock_guard lock(readMutex); int retVal = ::recvfrom(sock, (char*)__buf, __n, __flags, &__addr.addr, &__addr.length); if (retVal == SOCKET_ERROR) log::error("recvfrom()"); @@ -74,6 +75,7 @@ int Socket::recvfrom(void *__restrict __buf, size_t __n, int __flags, struct Add } int Socket::sendto(const void *__buf, size_t __n, int __flags, struct Address __addr) { + std::lock_guard lock(writeMutex); int retVal = ::sendto(sock, (char*)__buf, __n, __flags, &__addr.addr, __addr.length); if (retVal == SOCKET_ERROR) log::error("sendto()"); diff --git a/impl/Utils/GenerateID.cpp b/impl/Utils/GenerateID.cpp new file mode 100644 index 0000000..de82d3d --- /dev/null +++ b/impl/Utils/GenerateID.cpp @@ -0,0 +1,32 @@ +#include "Utils/Snowflake.hpp" + +namespace Chattr { + +Snowflake GenerateID() { + static struct _EPOCH { + _EPOCH() { + EPOCH = std::chrono::system_clock::now(); + } + std::chrono::system_clock::time_point EPOCH; + } __EPOCH__; + + static std::mutex snowflakeGenerateMutex_; + + std::lock_guard lock(snowflakeGenerateMutex_); +#ifdef _WIN32 + DWORD tid = GetCurrentThreadId(); +#elif __linux__ + pthread_t tid = pthread_self(); +#endif + static int sequence = 0; + Snowflake id = {}; + + auto timestamp = std::chrono::duration_cast(std::chrono::system_clock::now() - __EPOCH__.EPOCH); + id.timestamp = timestamp.count(); + id.instance = tid; + id.sequence = sequence++; + + return id; +} + +}; \ No newline at end of file diff --git a/impl/Utils/ThreadPool.cpp b/impl/Utils/ThreadPool.cpp index ef51c0c..c983628 100644 --- a/impl/Utils/ThreadPool.cpp +++ b/impl/Utils/ThreadPool.cpp @@ -63,7 +63,7 @@ void* ThreadPool::Worker() { #elif __linux__ pthread_t pid = pthread_self(); #endif - spdlog::debug("ThreadPool Worker : {} up", pid); + spdlog::trace("ThreadPool Worker : {} up", pid); while (!terminate_) { std::unique_lock lock(jobQueueMutex); jobQueueCV_.wait(lock, [this]() { return !this->jobs_.empty() || terminate_; }); @@ -78,10 +78,10 @@ void* ThreadPool::Worker() { jobs_.pop(); lock.unlock(); - spdlog::debug("ThreadPool Worker : {} Executing a job", pid); + spdlog::trace("ThreadPool Worker : {} Executing a job", pid); job(); } - spdlog::debug("ThreadPool Worker : {} down", pid); + spdlog::trace("ThreadPool Worker : {} down", pid); return nullptr; } diff --git a/include/Packet/Packet.hpp b/include/Packet/Packet.hpp index 9ba5a61..ff744ed 100644 --- a/include/Packet/Packet.hpp +++ b/include/Packet/Packet.hpp @@ -140,7 +140,7 @@ public: union { struct { PacketCategory packetType; - std::uint8_t padding; + RequestType requestType; DataType dataType; std::uint16_t packetLength; std::uint16_t destId[4]; @@ -200,7 +200,33 @@ public: } }; -class alignas(4) LoginResponsePacket : public ResponsePacket {}; +class alignas(4) LoginResponsePacket : public ResponsePacket { +public: + union { + struct { + PacketCategory packetType; + RequestType requestType; + DataType dataType; + std::uint16_t packetLength; + ResponseStatusCode responseStatusCode; + std::uint16_t yourId[4]; + std::uint8_t data[]; + } __data; + std::uint8_t serialized[1500] = ""; + }; + std::uint8_t* convToN() { + __data.packetLength = ::htons(__data.packetLength); + for (int i = 0; i < 4; i++) + __data.yourId[i] = ::htons(__data.yourId[i]); + return serialized; + } + std::uint8_t* convToH() { + __data.packetLength = ::ntohs(__data.packetLength); + for (int i = 0; i < 4; i++) + __data.yourId[i] = ::ntohs(__data.yourId[i]); + return serialized; + } +}; class alignas(4) RoomCreateResponsePacket : public ResponsePacket { public: diff --git a/include/Socket/IOCP.hpp b/include/Socket/IOCP.hpp index d0dc89a..2e6dec1 100644 --- a/include/Socket/IOCP.hpp +++ b/include/Socket/IOCP.hpp @@ -3,6 +3,7 @@ #include "Socket/WSAManager.hpp" #include "Socket/TCPSocket.hpp" #include "Socket/Log.hpp" +#include "Packet/Packet.hpp" #include #include @@ -31,7 +32,7 @@ struct IOCPPASSINDATA { char buf[1501]; std::uint32_t recvbytes; std::uint32_t sendbytes; - std::uint32_t transfrredbytes; + std::uint32_t transferredbytes; WSABUF wsabuf; IOCP* IOCPInstance; }; @@ -67,49 +68,95 @@ public: return; } - spdlog::trace("reading on tid: {} [{}]", tid, (std::string)iocpData->socket.remoteAddr); + std::lock_guard lock(iocpData->socket->readMutex); + + spdlog::trace("reading on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); int redSize = 0; - int packetSize = iocpData->wsabuf.len; + int headerSize = 8; int totalRedSize = 0; - while (totalRedSize < packetSize) { - redSize = iocpData->socket.recv(iocpData->buf, packetSize - totalRedSize, 0); + while (totalRedSize < headerSize) { + redSize = iocpData->socket->recv(iocpData->buf, headerSize - totalRedSize, 0); if (redSize <= 0) { - spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket.remoteAddr); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket.sock, NULL); - delete iocpData; - return; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + spdlog::trace("{}", strerror(errno)); + break; + } else { + spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; + } + } totalRedSize += redSize; } - iocpData->sendbytes = packetSize - totalRedSize; - iocpData->recvbytes = totalRedSize; + + Packet packet; + ::memcpy(&packet.serialized, iocpData->buf, headerSize); + + redSize = 0; + int dataLength = ntohs(packet.__data.packetLength); + + while (totalRedSize < dataLength + headerSize) { + redSize = iocpData->socket->recv(iocpData->buf + totalRedSize, dataLength + headerSize - totalRedSize, 0); + + if (redSize <= 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + spdlog::trace("{}", strerror(errno)); + break; + } else { + spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; + } + + } + totalRedSize += redSize; + } + iocpData->transferredbytes = totalRedSize; threadPool->enqueueJob(callback, iocpData); }; static void socketWriter(ThreadPool* threadPool, epoll_event event, int epollfd, std::function callback) { IOCPPASSINDATA* iocpData = (IOCPPASSINDATA*)event.data.ptr; + + pthread_t tid = pthread_self(); - int packetSize = iocpData->wsabuf.len; + if (iocpData == nullptr) { + spdlog::error("invalid call on {}", tid); + return; + } + + std::lock_guard lock(iocpData->socket->writeMutex); + + spdlog::trace("Writing on tid: {} [{}]", tid, (std::string)iocpData->socket->remoteAddr); + + int packetSize = iocpData->transferredbytes; int totalSentSize = 0; int sentSize = 0; - spdlog::trace("Sending to: [{}]", (std::string)iocpData->socket.remoteAddr); + spdlog::trace("Sending to: [{}]", (std::string)iocpData->socket->remoteAddr); while (totalSentSize < packetSize) { - sentSize = iocpData->socket.send(iocpData->buf, packetSize - totalSentSize, 0); + sentSize = iocpData->socket->send(iocpData->buf, packetSize - totalSentSize, 0); if (sentSize <= 0) { - spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket.remoteAddr); - ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket.sock, NULL); - delete iocpData; - return; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + spdlog::trace("{}", strerror(errno)); + break; + } else { + spdlog::debug("Client disconnected. [{}]", (std::string)iocpData->socket->remoteAddr); + ::epoll_ctl(epollfd, EPOLL_CTL_DEL, iocpData->socket->sock, NULL); + delete iocpData; + return; + } } totalSentSize += sentSize; } - iocpData->recvbytes = packetSize - totalSentSize; - iocpData->sendbytes = totalSentSize; + iocpData->transferredbytes = totalSentSize; threadPool->enqueueJob(callback, iocpData); }; static void iocpWatcher(ThreadPool* threadPool, int epollfd, std::function callback) { diff --git a/include/Socket/Socket.hpp b/include/Socket/Socket.hpp index 69482e0..429ee1a 100644 --- a/include/Socket/Socket.hpp +++ b/include/Socket/Socket.hpp @@ -33,6 +33,9 @@ public: int domain = 0; SOCKET sock = INVALID_SOCKET; + + std::mutex readMutex; + std::mutex writeMutex; protected: bool valid_ = false; }; diff --git a/include/Utils/Snowflake.hpp b/include/Utils/Snowflake.hpp index 3394468..5ed61e5 100644 --- a/include/Utils/Snowflake.hpp +++ b/include/Utils/Snowflake.hpp @@ -18,6 +18,8 @@ struct Snowflake { } }; +Snowflake GenerateID(); + } namespace std {