< 변수들 - variables >
// IOCP 컨텍스트 정의
class IOContext {
WSAOVERLAPPED wsaOverlapped;
int socket;
WSABUF wsaBuf;
char buf[10240];
HANDLE hCompletionPort;
HANDLE hCloseEvent;
BOOL bClosed;
IOContext(int sock);
~IOContext();
};
IOContext::IOContext(int sock)
{
socket = sock;
wsaBuf.len = sizeof(buf);
wsaBuf.buf = buf;
SecureZeroMemory((PVOID)&wsaOverlapped, sizeof(WSAOVERLAPPED));
hCloseEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
bClosed = FALSE;
}
IOContext::~IOContext()
{
CloseHandle(hCloseEvent);
}
List<IOContext> contextList; // IOCP 컨텍스트들 링크드 리스트로 관리
HANDLE hCompletionPort;
SYSTEM_INFO systemInfo;
DWORD dwThreadCount;
HANDLE hThreads[16];
< IOCP 초기화 및 쓰레드 생성 - initialize IOCP Handle and create worker threads >
void init()
{
hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
GetSystemInfo(&systemInfo);
dwThreadCount = systemInfo.dwNumberOfProcessors*2;
start();
}
void start()
{
for (int i=0; i < dwThreadCount; i++)
{
hThreads[i] = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, this, 0, NULL);
}
}
< IOCP 에 소켓등록 - register socket to IOCP Handle >
void registerSocket(int socketNum)
{
IOContext *context = new IOContext(socketNum);
contextList.insert(context);
context->hCompletionPort = CreateIoCompletionPort((HANDLE)socketNum, hCompletionPort, (ULONG_PTR)context, 0);
int err;
if (!PostQueuedCompletionStatus(hCompletionPort, 0, (ULONG_PTR)context, &context->wsaOverlapped)) {
if ((err = WSAGetLastError()) != WSA_IO_PENDING)
printf("[%s] PostQueuedCompletionStatus error: %d\r\n", err);
}
}
< IOCP 에 소켓등록해제 - unregister socket from IOCP Handle >
void unregisterSocket(int socketNum)
{
IOContext *context = (IOContext *)contextList.search(socketNum);
closesocket(socketNum);
if (WaitForSingleObject(context->hCloseEvent, 1000*5) == WAIT_TIMEOUT) {
printf("CloseEvent Wait Timeout !!!\n");
}
contextList.remove(context);
IOContext *context = (IOContext *)contextList.search(socketNum);
closesocket(socketNum);
if (WaitForSingleObject(context->hCloseEvent, 1000*5) == WAIT_TIMEOUT) {
printf("CloseEvent Wait Timeout !!!\n");
}
contextList.remove(context);
}
< Worker Thread >
unsigned int __stdcall WorkerThread(LPVOID lpParam)
{
BOOL bSuccess = FALSE;
DWORD dwIoSize;
LPOVERLAPPED lpOverlapped = NULL;
IOContext *context;
while (1)
{
bSuccess = GetQueuedCompletionStatus(hCompletionPort, &dwIoSize, (PULONG_PTR)&context, &lpOverlapped, INFINITE);
if (context->bClosed) continue;
if (context->bClosed) continue;
if (!bSuccess) {
err = GetLastError();
if (err == WSA_OPERATION_ABORTED) {
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
} else if (err == WSAENOTSOCK) {
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
continue;
}
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
} else if (err == WSAENOTSOCK) {
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
continue;
}
// DO SOMETHING
......
struct sockaddr_in fromAddress;
int len = sizeof(context->buf);
int addressSize = sizeof(fromAddress);
int bytesRead;
DWORD flag = 0;
int err;
context->wsaBuf.len = sizeof(buf);
context->wsaBuf.buf = context->buf;
if (WSARecvFrom(context->socket, &context->wsaBuf, 1, (LPDWORD)&bytesRead, (LPDWORD)&flag, (struct sockaddr*)&fromAddress, (socklen_t *)&addressSize, &context->wsaOverlapped, NULL) == SOCKET_ERROR) {
if ((err=WSAGetLastError()) != WSA_IO_PENDING) {
printf("[%s] WSARecvFrom error:%d, sock:%d, bytesRead:%d\r\n", __FUNCTION__, err, context->socket, bytesRead);
}
if (err == WSAENOTSOCK) { // invalid socket (Socket operation on nonsocket.)
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
if (err == WSAENOTSOCK) { // invalid socket (Socket operation on nonsocket.)
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
}
}
return 0;
return 0;
}
댓글 없음:
댓글 쓰기