일단 거의 모든 버그 다 고칢..
This commit is contained in:
@@ -6,6 +6,7 @@
|
|||||||
#include "Utils/ThreadPool.hpp"
|
#include "Utils/ThreadPool.hpp"
|
||||||
#include "Utils/StringTokenizer.hpp"
|
#include "Utils/StringTokenizer.hpp"
|
||||||
#include "Utils/Snowflake.hpp"
|
#include "Utils/Snowflake.hpp"
|
||||||
|
#include "Socket/IOCP.hpp"
|
||||||
#include "Packet/Packet.hpp"
|
#include "Packet/Packet.hpp"
|
||||||
|
|
||||||
#include <iostream>
|
#include <iostream>
|
||||||
@@ -13,13 +14,16 @@
|
|||||||
|
|
||||||
#include "precomp.hpp"
|
#include "precomp.hpp"
|
||||||
|
|
||||||
void _TCPRecvClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, Chattr::Address addr);
|
void _TCPRecvClient(Chattr::ThreadPool* threadPool, Chattr::IOCPPASSINDATA* data);
|
||||||
void _TCPSendClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, Chattr::Address addr, std::queue<Chattr::ResponsePacket> packets);
|
void _TCPSendClient(Chattr::ThreadPool* threadPool, Chattr::TCPSocket sock, std::queue<Chattr::ResponsePacket> packets);
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
struct Chattr::WSAManager wsaManager;
|
|
||||||
auto config = Chattr::ConfigManager::load();
|
auto config = Chattr::ConfigManager::load();
|
||||||
Chattr::log::setDefaultLogger(config.logLevel, config.logFileName, config.logfileSize, config.logfileCount);
|
Chattr::log::setDefaultLogger(config.logLevel, config.logFileName, config.logfileSize, config.logfileCount);
|
||||||
|
Chattr::ThreadPool threadPool(0);
|
||||||
|
Chattr::IOCP iocp;
|
||||||
|
iocp.init(&threadPool, _TCPRecvClient);
|
||||||
|
// struct Chattr::WSAManager wsaManager;
|
||||||
|
|
||||||
Chattr::TCPSocket sock;
|
Chattr::TCPSocket sock;
|
||||||
struct Chattr::Address serveraddr;
|
struct Chattr::Address serveraddr;
|
||||||
@@ -43,61 +47,84 @@ int main() {
|
|||||||
#elif __linux__
|
#elif __linux__
|
||||||
pid_t pid = getpid();
|
pid_t pid = getpid();
|
||||||
#endif
|
#endif
|
||||||
spdlog::info("PID : {}", pid);
|
spdlog::debug("PID : {}", pid);
|
||||||
|
|
||||||
|
DWORD recvbytes = 0, flags = 0;
|
||||||
Chattr::ThreadPool threadPool(0);
|
|
||||||
/*int returnedIntager = 0;
|
|
||||||
int passvalue = 2;
|
|
||||||
threadPool.enqueueJob([](int& i){
|
|
||||||
spdlog::info("JobTest");
|
|
||||||
if (i == 2) {
|
|
||||||
i = 1;
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
return 1;
|
|
||||||
}, returnedIntager, std::ref(passvalue));*/
|
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
spdlog::info("Waiting for connection...");
|
spdlog::info("Waiting for connection...");
|
||||||
sock.accept(clientSock, clientAddr);
|
sock.accept(clientSock, clientAddr);
|
||||||
|
Chattr::IOCPPASSINDATA* ptr = new Chattr::IOCPPASSINDATA;
|
||||||
|
ZeroMemory(&ptr->overlapped, sizeof(OVERLAPPED));
|
||||||
|
ptr->socket = std::move(clientSock);
|
||||||
|
ptr->recvbytes = ptr->sendbytes = 0;
|
||||||
|
ptr->wsabuf.buf = ptr->buf;
|
||||||
|
ptr->wsabuf.len = 1500;
|
||||||
|
|
||||||
threadPool.enqueueJob(_TCPRecvClient, std::move(clientSock), clientAddr);
|
iocp.registerSocket(ptr->socket.sock);
|
||||||
|
|
||||||
|
int returnData = WSARecv(ptr->socket.sock, &ptr->wsabuf, 1, &recvbytes, &flags, &ptr->overlapped, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void _TCPRecvClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, Chattr::Address addr) {
|
void _TCPRecvClient(Chattr::ThreadPool* thread, Chattr::IOCPPASSINDATA* data) {
|
||||||
|
#ifdef _WIN32
|
||||||
|
DWORD tid = GetCurrentThreadId();
|
||||||
|
#elif __linux__
|
||||||
|
pthread_t tid = pthread_self();
|
||||||
|
#endif
|
||||||
|
spdlog::info("entered recvfunc on TID: {}", tid);
|
||||||
Chattr::Packet pack;
|
Chattr::Packet pack;
|
||||||
int packetSize = 1500;
|
int packetSize = 1500;
|
||||||
int totalRedSize = 0;
|
int totalRedSize = 0;
|
||||||
int redSize = 0;
|
int redSize = 0;
|
||||||
|
|
||||||
while (totalRedSize < packetSize) {
|
// spdlog::info("Receving from: [{}]", (std::string)sock.remoteAddr);
|
||||||
redSize = sock.recv(&pack.serialized, 1500 - totalRedSize, 0);
|
|
||||||
|
//while (totalRedSize < packetSize) {
|
||||||
if (redSize <= 0) {
|
// redSize = data->socket.recv(&pack.serialized, 1500 - totalRedSize, 0);
|
||||||
spdlog::info("Client disconnected. [{}]", (std::string)addr);
|
//
|
||||||
return;
|
// if (redSize <= 0) {
|
||||||
}
|
// // spdlog::info("Client disconnected. [{}]", (std::string)sock.remoteAddr);
|
||||||
totalRedSize += redSize;
|
// return;
|
||||||
}
|
// }
|
||||||
|
// totalRedSize += redSize;
|
||||||
|
//}
|
||||||
|
|
||||||
|
memcpy(pack.serialized, data->wsabuf.buf, data->wsabuf.len);
|
||||||
|
|
||||||
std::string recvString;
|
std::string recvString;
|
||||||
bool packetError = false;
|
bool packetError = false;
|
||||||
Chattr::DataPostPacket dataPostPacket;
|
Chattr::DataPostPacket dataPostPacket;
|
||||||
|
|
||||||
|
|
||||||
|
Chattr::ResponsePacket packet;
|
||||||
|
std::queue<Chattr::ResponsePacket> responsePackets;
|
||||||
|
|
||||||
switch (pack.__data.packetType) {
|
switch (pack.__data.packetType) {
|
||||||
case Chattr::PacketType::PACKET_POST:
|
case Chattr::PacketType::PACKET_POST:
|
||||||
|
if (pack.__data.requestType != Chattr::RequestType::DATA){
|
||||||
|
packetError = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
switch (pack.__data.dataType) {
|
switch (pack.__data.dataType) {
|
||||||
case Chattr::DataType::TEXT:
|
case Chattr::DataType::TEXT:
|
||||||
std::memcpy(&dataPostPacket.serialized, &pack, 1500);
|
std::memcpy(&dataPostPacket.serialized, &pack, 1500);
|
||||||
dataPostPacket.convToH();
|
dataPostPacket.convToH();
|
||||||
recvString = std::string((char *)dataPostPacket.__data.data, dataPostPacket.__data.packetLength - sizeof(std::uint16_t));
|
dataPostPacket.__data.packetLength = (dataPostPacket.__data.packetLength > 1487) ? 1487 : dataPostPacket.__data.packetLength;
|
||||||
spdlog::info("Red size : {}, {} from : [{}]", redSize, recvString, (std::string)addr);
|
recvString = std::string((char *)dataPostPacket.__data.data, dataPostPacket.__data.packetLength - (sizeof(std::uint16_t)*4));
|
||||||
|
spdlog::info("Red size : {}, {} from : [{}]", totalRedSize, recvString, (std::string)data->socket.remoteAddr);
|
||||||
break;
|
break;
|
||||||
case Chattr::DataType::BINARY:
|
case Chattr::DataType::BINARY:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
packet.__data.packetType = Chattr::PacketType::PACKET_RESPONSE;
|
||||||
|
packet.__data.requestType = Chattr::RequestType::DATA;
|
||||||
|
packet.__data.dataType = Chattr::DataType::TEXT;
|
||||||
|
packet.__data.packetLength = sizeof(Chattr::ResponseStatusCode);
|
||||||
|
packet.__data.responseStatusCode = Chattr::ResponseStatusCode::BAD_REQUEST;
|
||||||
|
packet.convToN();
|
||||||
|
responsePackets.push(packet);
|
||||||
packetError = true;
|
packetError = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -117,6 +144,13 @@ void _TCPRecvClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, Chattr::
|
|||||||
case Chattr::RequestType::USERS_LIST:
|
case Chattr::RequestType::USERS_LIST:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
packet.__data.packetType = Chattr::PacketType::PACKET_RESPONSE;
|
||||||
|
packet.__data.requestType = Chattr::RequestType::DATA;
|
||||||
|
packet.__data.dataType = Chattr::DataType::TEXT;
|
||||||
|
packet.__data.packetLength = sizeof(Chattr::ResponseStatusCode);
|
||||||
|
packet.__data.responseStatusCode = Chattr::ResponseStatusCode::BAD_REQUEST;
|
||||||
|
packet.convToN();
|
||||||
|
responsePackets.push(packet);
|
||||||
packetError = true;
|
packetError = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -124,42 +158,48 @@ void _TCPRecvClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, Chattr::
|
|||||||
case Chattr::PacketType::PACKET_CONTINUE:
|
case Chattr::PacketType::PACKET_CONTINUE:
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
|
packet.__data.packetType = Chattr::PacketType::PACKET_RESPONSE;
|
||||||
|
packet.__data.requestType = Chattr::RequestType::DATA;
|
||||||
|
packet.__data.dataType = Chattr::DataType::TEXT;
|
||||||
|
packet.__data.packetLength = sizeof(Chattr::ResponseStatusCode);
|
||||||
|
packet.__data.responseStatusCode = Chattr::ResponseStatusCode::BAD_REQUEST;
|
||||||
|
packet.convToN();
|
||||||
|
responsePackets.push(packet);
|
||||||
packetError = true;
|
packetError = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (packetError) {
|
Sleep(10000);
|
||||||
Chattr::ResponsePacket packet;
|
|
||||||
packet.__data.responseStatusCode = Chattr::ResponseStatusCode::BAD_REQUEST;
|
/*if (packetError) {
|
||||||
std::queue<Chattr::ResponsePacket> packs;
|
thread->enqueueJob(_TCPRecvClient, data);;
|
||||||
packet.convToN();
|
|
||||||
packs.push(packet);
|
|
||||||
thread->enqueueJob(_TCPSendClient, std::move(sock), addr, packs);
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
thread->enqueueJob(_TCPRecvClient, std::move(sock), addr);
|
thread->enqueueJob(_TCPRecvClient, data);*/
|
||||||
}
|
}
|
||||||
|
|
||||||
void _TCPSendClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, Chattr::Address addr, std::queue<Chattr::ResponsePacket> packets) {
|
void _TCPSendClient(Chattr::ThreadPool* thread, Chattr::TCPSocket sock, std::queue<Chattr::ResponsePacket> packets) {
|
||||||
Chattr::ResponsePacket pack = packets.front();
|
Chattr::ResponsePacket pack = packets.front();
|
||||||
packets.pop();
|
packets.pop();
|
||||||
int packetSize = 1500;
|
int packetSize = 1500;
|
||||||
int totalSentSize = 0;
|
int totalSentSize = 0;
|
||||||
int sentSize = 0;
|
int sentSize = 0;
|
||||||
|
|
||||||
|
spdlog::info("Sending to: [{}]", (std::string)sock.remoteAddr);
|
||||||
|
|
||||||
while (totalSentSize < packetSize) {
|
while (totalSentSize < packetSize) {
|
||||||
|
|
||||||
sentSize = sock.send(&pack.serialized, 1500 - totalSentSize, 0);
|
sentSize = sock.send(&pack.serialized, 1500 - totalSentSize, 0);
|
||||||
|
|
||||||
if (sentSize <= 0) {
|
if (sentSize <= 0) {
|
||||||
spdlog::info("Client disconnected. [{}]", (std::string)addr);
|
spdlog::info("Client disconnected. [{}]", (std::string)sock.remoteAddr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
totalSentSize += sentSize;
|
totalSentSize += sentSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (packets.empty())
|
/*if (packets.empty())
|
||||||
thread->enqueueJob(_TCPRecvClient, std::move(sock), addr);
|
thread->enqueueJob(_TCPRecvClient, std::move(sock));
|
||||||
else
|
else
|
||||||
thread->enqueueJob(_TCPSendClient, std::move(sock), addr, packets);
|
thread->enqueueJob(_TCPSendClient, std::move(sock), packets);*/
|
||||||
}
|
}
|
||||||
@@ -1,24 +1,15 @@
|
|||||||
#include "Socket/IOCP.hpp"
|
#include "Socket/IOCP.hpp"
|
||||||
#include "Socket/WSAManager.hpp"
|
#include "Utils/ThreadPool.hpp"
|
||||||
#include "Socket/Log.hpp"
|
|
||||||
#include "precomp.hpp"
|
#include "precomp.hpp"
|
||||||
|
|
||||||
namespace Chattr {
|
namespace Chattr {
|
||||||
|
|
||||||
IOCP::IOCP(std::shared_ptr<ThreadPool> __IOCPThread) {
|
void IOCP::registerSocket(SOCKET sock) {
|
||||||
init(__IOCPThread);
|
|
||||||
}
|
|
||||||
|
|
||||||
IOCP::~IOCP() {
|
|
||||||
}
|
|
||||||
|
|
||||||
void IOCP::init(std::shared_ptr<ThreadPool> __IOCPThread) {
|
|
||||||
IOCPThread_ = __IOCPThread;
|
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
struct Chattr::WSAManager wsaManager;
|
HANDLE returnData = ::CreateIoCompletionPort((HANDLE)sock, completionPort_, sock, 0);
|
||||||
completinPort_ = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
|
if (returnData == 0)
|
||||||
if (completinPort_ == NULL)
|
completionPort_ = returnData;
|
||||||
log::critical("CreateIoCompletionPort()");
|
|
||||||
#elif __linux__
|
#elif __linux__
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -15,8 +15,8 @@ Socket::~Socket() {
|
|||||||
int Socket::init(int domain, int type, int protocol) {
|
int Socket::init(int domain, int type, int protocol) {
|
||||||
this->domain = domain;
|
this->domain = domain;
|
||||||
|
|
||||||
sock_ = ::socket(domain, type, protocol);
|
sock = ::socket(domain, type, protocol);
|
||||||
if (sock_ == INVALID_SOCKET)
|
if (sock == INVALID_SOCKET)
|
||||||
log::critical("socket()");
|
log::critical("socket()");
|
||||||
|
|
||||||
valid_ = true;
|
valid_ = true;
|
||||||
@@ -28,9 +28,9 @@ void Socket::destruct() {
|
|||||||
if (!valid_)
|
if (!valid_)
|
||||||
return;
|
return;
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
::closesocket(sock_);
|
::closesocket(sock);
|
||||||
#elif __linux__
|
#elif __linux__
|
||||||
::close(sock_);
|
::close(sock);
|
||||||
#endif
|
#endif
|
||||||
valid_ = false;
|
valid_ = false;
|
||||||
}
|
}
|
||||||
@@ -38,7 +38,7 @@ void Socket::destruct() {
|
|||||||
Socket::operator SOCKET() {
|
Socket::operator SOCKET() {
|
||||||
if (valid_) {
|
if (valid_) {
|
||||||
valid_ = false;
|
valid_ = false;
|
||||||
return sock_;
|
return sock;
|
||||||
}
|
}
|
||||||
spdlog::critical("No valid socket created.");
|
spdlog::critical("No valid socket created.");
|
||||||
return INVALID_SOCKET;
|
return INVALID_SOCKET;
|
||||||
@@ -50,41 +50,41 @@ void Socket::set(const SOCKET __sock, int __domain) {
|
|||||||
|
|
||||||
destruct();
|
destruct();
|
||||||
|
|
||||||
sock_ = __sock;
|
sock = __sock;
|
||||||
valid_ = true;
|
valid_ = true;
|
||||||
};
|
};
|
||||||
|
|
||||||
int Socket::bind(Address __addr) {
|
int Socket::bind(Address __addr) {
|
||||||
bindAddr = __addr;
|
bindAddr = __addr;
|
||||||
int retVal = ::bind(sock_, &__addr.addr, __addr.length);
|
int retVal = ::bind(sock, &__addr.addr, __addr.length);
|
||||||
if (retVal == INVALID_SOCKET)
|
if (retVal == INVALID_SOCKET)
|
||||||
log::critical("bind()");
|
log::critical("bind()");
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
int Socket::recvfrom(void *__restrict __buf, size_t __n, int __flags, struct Address& __addr) {
|
int Socket::recvfrom(void *__restrict __buf, size_t __n, int __flags, struct Address& __addr) {
|
||||||
int retVal = ::recvfrom(sock_, (char*)__buf, __n, __flags, &__addr.addr, &__addr.length);
|
int retVal = ::recvfrom(sock, (char*)__buf, __n, __flags, &__addr.addr, &__addr.length);
|
||||||
if (retVal == SOCKET_ERROR)
|
if (retVal == SOCKET_ERROR)
|
||||||
log::error("recvfrom()");
|
log::error("recvfrom()");
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
int Socket::sendto(const void *__buf, size_t __n, int __flags, struct Address __addr) {
|
int Socket::sendto(const void *__buf, size_t __n, int __flags, struct Address __addr) {
|
||||||
int retVal = ::sendto(sock_, (char*)__buf, __n, __flags, &__addr.addr, __addr.length);
|
int retVal = ::sendto(sock, (char*)__buf, __n, __flags, &__addr.addr, __addr.length);
|
||||||
if (retVal == SOCKET_ERROR)
|
if (retVal == SOCKET_ERROR)
|
||||||
log::error("sendto()");
|
log::error("sendto()");
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
Socket::Socket(Socket &&other_) {
|
Socket::Socket(Socket &&other_) noexcept {
|
||||||
other_.valid_ = false;
|
other_.valid_ = false;
|
||||||
sock_ = other_.sock_;
|
memcpy(this, &other_, sizeof(Socket));
|
||||||
valid_ = true;
|
valid_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
Socket& Socket::operator=(Socket && other_) {
|
Socket& Socket::operator=(Socket && other_) noexcept {
|
||||||
other_.valid_ = false;
|
other_.valid_ = false;
|
||||||
sock_ = other_.sock_;
|
memcpy(this, &other_, sizeof(Socket));
|
||||||
valid_ = true;
|
valid_ = true;
|
||||||
|
|
||||||
return *this;
|
return *this;
|
||||||
|
|||||||
@@ -9,35 +9,36 @@ int TCPSocket::init(int domain) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int TCPSocket::listen(int __n) {
|
int TCPSocket::listen(int __n) {
|
||||||
int retVal = ::listen(sock_, __n);
|
int retVal = ::listen(sock, __n);
|
||||||
if (retVal == INVALID_SOCKET)
|
if (retVal == INVALID_SOCKET)
|
||||||
log::error("listen()");
|
log::error("listen()");
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
void TCPSocket::accept(TCPSocket& newSock, Address& __addr) {
|
void TCPSocket::accept(TCPSocket& newSock, Address& __addr) {
|
||||||
newSock.set(::accept(sock_, &__addr.addr, &__addr.length), domain);
|
newSock.set(::accept(sock, &__addr.addr, &__addr.length), domain);
|
||||||
if (newSock == INVALID_SOCKET)
|
memcpy(&newSock.remoteAddr, &__addr, sizeof(Chattr::Address));
|
||||||
|
if (newSock.sock == INVALID_SOCKET)
|
||||||
log::error("accept()");
|
log::error("accept()");
|
||||||
}
|
}
|
||||||
|
|
||||||
int TCPSocket::connect(Address& serveraddr) {
|
int TCPSocket::connect(Address& serveraddr) {
|
||||||
int retVal = ::connect(sock_, (struct sockaddr *)&serveraddr.addr, serveraddr.length);
|
int retVal = ::connect(sock, (struct sockaddr *)&serveraddr.addr, serveraddr.length);
|
||||||
remoteAddr = serveraddr;
|
memcpy(&remoteAddr, &serveraddr, sizeof(Chattr::Address));
|
||||||
if (retVal == INVALID_SOCKET)
|
if (retVal == INVALID_SOCKET)
|
||||||
log::error("connect()");
|
log::error("connect()");
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
int TCPSocket::recv(void *__restrict __buf, size_t __n, int __flags) {
|
int TCPSocket::recv(void *__restrict __buf, size_t __n, int __flags) {
|
||||||
int retVal = ::recv(sock_, (char *)__buf, __n, __flags);
|
int retVal = ::recv(sock, (char *)__buf, __n, __flags);
|
||||||
if (retVal == SOCKET_ERROR)
|
if (retVal == SOCKET_ERROR)
|
||||||
log::error("recv()");
|
log::error("recv()");
|
||||||
return retVal;
|
return retVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
int TCPSocket::send(const void *__buf, size_t __n, int __flags) {
|
int TCPSocket::send(const void *__buf, size_t __n, int __flags) {
|
||||||
int retVal = ::send(sock_, (char*)__buf, __n, __flags);
|
int retVal = ::send(sock, (char*)__buf, __n, __flags);
|
||||||
if (retVal == SOCKET_ERROR)
|
if (retVal == SOCKET_ERROR)
|
||||||
log::error("send()");
|
log::error("send()");
|
||||||
return retVal;
|
return retVal;
|
||||||
|
|||||||
@@ -11,11 +11,7 @@ ThreadPool::ThreadPool(std::uint32_t numThreads) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ThreadPool::~ThreadPool() {
|
ThreadPool::~ThreadPool() {
|
||||||
terminate_ = true;
|
terminate();
|
||||||
jobQueueCV_.notify_all();
|
|
||||||
|
|
||||||
for (auto& t : workers_)
|
|
||||||
t.join();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ThreadPool::init(std::uint32_t numThreads) {
|
void ThreadPool::init(std::uint32_t numThreads) {
|
||||||
@@ -29,12 +25,36 @@ void ThreadPool::init(std::uint32_t numThreads) {
|
|||||||
numCPU = sysconf(_SC_NPROCESSORS_ONLN);
|
numCPU = sysconf(_SC_NPROCESSORS_ONLN);
|
||||||
#endif
|
#endif
|
||||||
spdlog::info("Auto-detected cpu count: {}", numCPU);
|
spdlog::info("Auto-detected cpu count: {}", numCPU);
|
||||||
spdlog::info("Set ThreadPool Worker count to: {}", numCPU);
|
if (numCPU == 1 || numCPU == 2) {
|
||||||
|
numCPU = 4;
|
||||||
|
spdlog::info("Set ThreadPool Worker count to: {} due to program to oprate concurrently", numCPU);
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
spdlog::info("Set ThreadPool Worker count to: {}", numCPU);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
threadCount = numCPU;
|
||||||
workers_.reserve(numCPU);
|
workers_.reserve(numCPU);
|
||||||
|
|
||||||
while (numCPU--)
|
while (numCPU--)
|
||||||
workers_.push_back([this]() { this->Worker(); });
|
workers_.push_back([this]() {
|
||||||
|
this->Worker();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadPool::terminate() {
|
||||||
|
terminate_ = true;
|
||||||
|
jobQueueCV_.notify_all();
|
||||||
|
|
||||||
|
spdlog::debug("waiting for threads to end their jobs...");
|
||||||
|
for (auto& t : workers_)
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void ThreadPool::respawnWorker(std::uint32_t numThreads) {
|
||||||
|
terminate();
|
||||||
|
terminate_ = false;
|
||||||
|
init(numThreads);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* ThreadPool::Worker() {
|
void* ThreadPool::Worker() {
|
||||||
@@ -43,18 +63,17 @@ void* ThreadPool::Worker() {
|
|||||||
#elif __linux__
|
#elif __linux__
|
||||||
pthread_t pid = pthread_self();
|
pthread_t pid = pthread_self();
|
||||||
#endif
|
#endif
|
||||||
spdlog::info("ThreadPool Worker : {} up.", pid);
|
|
||||||
while (!terminate_) {
|
while (!terminate_) {
|
||||||
std::unique_lock<std::mutex> lock(jobQueueMutex);
|
std::unique_lock<std::mutex> lock(jobQueueMutex);
|
||||||
jobQueueCV_.wait(lock, [this]() { return !this->jobs_.empty() || terminate_; });
|
jobQueueCV_.wait(lock, [this]() { return !this->jobs_.empty() || terminate_; });
|
||||||
if (this->jobs_.empty())
|
if (this->jobs_.empty())
|
||||||
return nullptr;
|
break;
|
||||||
|
|
||||||
auto job = std::move(jobs_.front());
|
auto job = std::move(jobs_.front());
|
||||||
jobs_.pop();
|
jobs_.pop();
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
|
|
||||||
spdlog::info("ThreadPool Worker : {} Executing a job", pid);
|
spdlog::debug("ThreadPool Worker : {} Executing a job", pid);
|
||||||
job();
|
job();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,24 +1,81 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
#include "Utils/ThreadPool.hpp"
|
#include "Utils/ThreadPool.hpp"
|
||||||
|
#include "Socket/WSAManager.hpp"
|
||||||
|
#include "Socket/TCPSocket.hpp"
|
||||||
|
#include "Socket/Log.hpp"
|
||||||
|
#include <functional>
|
||||||
|
|
||||||
namespace Chattr {
|
namespace Chattr {
|
||||||
|
|
||||||
|
struct IOCPPASSINDATA {
|
||||||
|
OVERLAPPED overlapped;
|
||||||
|
TCPSocket socket;
|
||||||
|
char buf[1501];
|
||||||
|
int recvbytes;
|
||||||
|
int sendbytes;
|
||||||
|
WSABUF wsabuf;
|
||||||
|
};
|
||||||
|
|
||||||
class IOCP {
|
class IOCP {
|
||||||
public:
|
public:
|
||||||
IOCP(std::shared_ptr<ThreadPool> __IOCPThread);
|
static void iocpWather(ThreadPool* threadPool, HANDLE completionPort_, std::function<void(ThreadPool*, IOCPPASSINDATA*)> callback) {
|
||||||
~IOCP();
|
DWORD tid = GetCurrentThreadId();
|
||||||
|
spdlog::debug("Waiting IO to complete on TID: {}.", tid);
|
||||||
|
IOCPPASSINDATA* data;
|
||||||
|
SOCKET sock;
|
||||||
|
DWORD cbTransfrred;
|
||||||
|
int retVal = GetQueuedCompletionStatus(completionPort_, &cbTransfrred, (PULONG_PTR)&sock, (LPOVERLAPPED*)&data, INFINITE);
|
||||||
|
if (retVal == 0 || cbTransfrred == 0) {
|
||||||
|
spdlog::info("Client disconnected. [{}]", (std::string)(data->socket.remoteAddr));
|
||||||
|
threadPool->enqueueJob(iocpWather, completionPort_, callback);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
threadPool->enqueueJob(callback, data);
|
||||||
|
threadPool->enqueueJob(iocpWather, completionPort_, callback);
|
||||||
|
};
|
||||||
|
|
||||||
void init(std::shared_ptr<ThreadPool> __IOCPThread);
|
template<typename _Callable>
|
||||||
|
void init(ThreadPool* __IOCPThread, _Callable&& callback) {
|
||||||
|
IOCPThread_ = __IOCPThread;
|
||||||
|
#ifdef _WIN32
|
||||||
|
completionPort_ = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
|
||||||
|
if (completionPort_ == NULL)
|
||||||
|
log::critical("CreateIoCompletionPort()");
|
||||||
|
|
||||||
|
auto boundFunc = [callback = std::move(callback)](ThreadPool* __IOCPThread, IOCPPASSINDATA* data) mutable {
|
||||||
|
callback(__IOCPThread, data);
|
||||||
|
};
|
||||||
|
|
||||||
|
int tCount = __IOCPThread->threadCount;
|
||||||
|
|
||||||
|
spdlog::info("Resizing threadpool size to: {}", tCount * 2);
|
||||||
|
|
||||||
|
__IOCPThread->respawnWorker(tCount * 2);
|
||||||
|
|
||||||
|
spdlog::info("Set IOCP Worker count to: {}", tCount);
|
||||||
|
for (int i = 0; i < tCount; i++) {
|
||||||
|
std::function<void(ThreadPool*, IOCPPASSINDATA*)> task(boundFunc);
|
||||||
|
__IOCPThread->enqueueJob(iocpWather, completionPort_, task);
|
||||||
|
}
|
||||||
|
#elif __linux__
|
||||||
|
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
void registerSocket(SOCKET sock);
|
||||||
|
|
||||||
int recv(void* __restrict __buf, size_t __n, int __flags);
|
int recv(void* __restrict __buf, size_t __n, int __flags);
|
||||||
int send(const void* __buf, size_t __n, int __flags);
|
int send(const void* __buf, size_t __n, int __flags);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::shared_ptr<ThreadPool> IOCPThread_;
|
struct Chattr::WSAManager wsaManager;
|
||||||
|
ThreadPool* IOCPThread_;
|
||||||
|
|
||||||
#ifdef _WIN32
|
#ifdef _WIN32
|
||||||
HANDLE completinPort_;
|
HANDLE completionPort_ = INVALID_HANDLE_VALUE;
|
||||||
#elif __linux__
|
#elif __linux__
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -23,17 +23,17 @@ public:
|
|||||||
int sendto(const void *__buf, size_t __n, int __flags, struct Address __addr);
|
int sendto(const void *__buf, size_t __n, int __flags, struct Address __addr);
|
||||||
|
|
||||||
Socket(const Socket&) = delete;
|
Socket(const Socket&) = delete;
|
||||||
Socket(Socket&&);
|
Socket(Socket&&) noexcept;
|
||||||
Socket& operator=(const Socket&) = delete;
|
Socket& operator=(const Socket&) = delete;
|
||||||
Socket& operator=(Socket&&);
|
Socket& operator=(Socket&&) noexcept;
|
||||||
|
|
||||||
struct Address bindAddr = {};
|
struct Address bindAddr = {};
|
||||||
struct Address remoteAddr = {};
|
struct Address remoteAddr = {};
|
||||||
|
|
||||||
int domain = 0;
|
int domain = 0;
|
||||||
|
SOCKET sock = INVALID_SOCKET;
|
||||||
protected:
|
protected:
|
||||||
bool valid_ = false;
|
bool valid_ = false;
|
||||||
SOCKET sock_ = INVALID_SOCKET;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
@@ -39,7 +39,7 @@ public:
|
|||||||
requires (!std::is_same_v<std::decay_t<_Callable>, Thread>) &&
|
requires (!std::is_same_v<std::decay_t<_Callable>, Thread>) &&
|
||||||
(!std::is_void_v<std::invoke_result_t<_Callable, _Args...>>)
|
(!std::is_void_v<std::invoke_result_t<_Callable, _Args...>>)
|
||||||
Thread(_Callable&& __f, _Args&&... __args) {
|
Thread(_Callable&& __f, _Args&&... __args) {
|
||||||
auto boundFunc = [this, __f = std::move(__f), ... __args = std::move(__args)]() mutable {
|
auto boundFunc = [this, __f, ... __args = std::move(__args)]() mutable {
|
||||||
returnValuePtr = new std::invoke_result_t<_Callable, _Args...>(__f(std::move(__args)...));
|
returnValuePtr = new std::invoke_result_t<_Callable, _Args...>(__f(std::move(__args)...));
|
||||||
};
|
};
|
||||||
std::packaged_task<void()>* funcPtr = new std::packaged_task<void()>(std::move(boundFunc));
|
std::packaged_task<void()>* funcPtr = new std::packaged_task<void()>(std::move(boundFunc));
|
||||||
@@ -53,7 +53,7 @@ public:
|
|||||||
requires (!std::is_same_v<std::decay_t<_Callable>, Thread>) &&
|
requires (!std::is_same_v<std::decay_t<_Callable>, Thread>) &&
|
||||||
std::is_void_v<std::invoke_result_t<_Callable, _Args...>>
|
std::is_void_v<std::invoke_result_t<_Callable, _Args...>>
|
||||||
Thread(_Callable&& __f, _Args&&... __args) {
|
Thread(_Callable&& __f, _Args&&... __args) {
|
||||||
auto boundFunc = [this, __f = std::move(__f), ... __args = std::move(__args)]() mutable {
|
auto boundFunc = [this, __f, ... __args = std::move(__args)]() mutable {
|
||||||
__f(std::move(__args)...);
|
__f(std::move(__args)...);
|
||||||
};
|
};
|
||||||
std::packaged_task<void()>* funcPtr = new std::packaged_task<void()>(std::move(boundFunc));
|
std::packaged_task<void()>* funcPtr = new std::packaged_task<void()>(std::move(boundFunc));
|
||||||
|
|||||||
@@ -16,6 +16,9 @@ public:
|
|||||||
~ThreadPool();
|
~ThreadPool();
|
||||||
|
|
||||||
void init(std::uint32_t numThreads);
|
void init(std::uint32_t numThreads);
|
||||||
|
void terminate();
|
||||||
|
|
||||||
|
void respawnWorker(std::uint32_t numThreads);
|
||||||
|
|
||||||
template<typename _Callable, typename... _Args>
|
template<typename _Callable, typename... _Args>
|
||||||
requires (!std::is_void_v<std::invoke_result_t<_Callable, ThreadPool*, _Args...>>)
|
requires (!std::is_void_v<std::invoke_result_t<_Callable, ThreadPool*, _Args...>>)
|
||||||
@@ -26,7 +29,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(jobQueueMutex);
|
std::lock_guard<std::mutex> lock(jobQueueMutex);
|
||||||
auto boundFunc = [this, &retVal, __job = std::move(__job), ... __args = std::move(__args)]() mutable {
|
auto boundFunc = [this, &retVal, __job, ... __args = std::move(__args)]() mutable {
|
||||||
retVal = __job(this, std::move(__args)...);
|
retVal = __job(this, std::move(__args)...);
|
||||||
};
|
};
|
||||||
auto task = std::packaged_task<void()>(std::move(boundFunc));
|
auto task = std::packaged_task<void()>(std::move(boundFunc));
|
||||||
@@ -44,7 +47,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::lock_guard<std::mutex> lock(jobQueueMutex);
|
std::lock_guard<std::mutex> lock(jobQueueMutex);
|
||||||
auto boundFunc = [this, __job = std::move(__job), ... __args = std::move(__args)]() mutable {
|
auto boundFunc = [this, __job, ... __args = std::move(__args)]() mutable {
|
||||||
__job(this, std::move(__args)...);
|
__job(this, std::move(__args)...);
|
||||||
};
|
};
|
||||||
auto task = std::packaged_task<void()>(std::move(boundFunc));
|
auto task = std::packaged_task<void()>(std::move(boundFunc));
|
||||||
@@ -54,6 +57,7 @@ public:
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int threadCount = 0;
|
||||||
private:
|
private:
|
||||||
void* Worker();
|
void* Worker();
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
#include <winsock2.h>
|
#include <winsock2.h>
|
||||||
#include <ws2tcpip.h>
|
#include <ws2tcpip.h>
|
||||||
#include <ws2bth.h>
|
#include <ws2bth.h>
|
||||||
|
#include <ws2def.h>
|
||||||
#include <windows.h>
|
#include <windows.h>
|
||||||
#define in_addr_t ULONG
|
#define in_addr_t ULONG
|
||||||
#elif __linux__
|
#elif __linux__
|
||||||
|
|||||||
Reference in New Issue
Block a user