2011년 10월 14일 금요일

IOCP로 UDP 데이터 수신 - how to use IOCP WSARecvFrom


< 변수들 - variables >

// IOCP 컨텍스트 정의
class IOContext {
WSAOVERLAPPED wsaOverlapped;
int socket;
WSABUF wsaBuf;
char buf[10240];
HANDLE hCompletionPort;
    HANDLE hCloseEvent;
    BOOL bClosed;

    IOContext(int sock);
    ~IOContext();

};


IOContext::IOContext(int sock)
{
socket = sock;
wsaBuf.len = sizeof(buf);
wsaBuf.buf = buf;
SecureZeroMemory((PVOID)&wsaOverlapped, sizeof(WSAOVERLAPPED));

hCloseEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
bClosed = FALSE;
}

IOContext::~IOContext()
{
CloseHandle(hCloseEvent);
}


List<IOContext> contextList;    // IOCP 컨텍스트들 링크드 리스트로 관리
HANDLE hCompletionPort;
SYSTEM_INFO systemInfo;
DWORD dwThreadCount;
HANDLE hThreads[16];


< IOCP 초기화 및 쓰레드 생성 - initialize IOCP Handle and create worker threads >

void init()
{

    hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    GetSystemInfo(&systemInfo);
    dwThreadCount = systemInfo.dwNumberOfProcessors*2;
 
    start();
}

void start()
{
    for (int i=0; i < dwThreadCount; i++)
    {
  hThreads[i] = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, this, 0, NULL);
    }
}

< IOCP 에 소켓등록 - register socket to IOCP Handle >

void registerSocket(int socketNum)
{
     IOContext *context = new IOContext(socketNum);

     contextList.insert(context);
     context->hCompletionPort = CreateIoCompletionPort((HANDLE)socketNum, hCompletionPort, (ULONG_PTR)context, 0);

int err;
if (!PostQueuedCompletionStatus(hCompletionPort, 0, (ULONG_PTR)context, &context->wsaOverlapped)) {
  if ((err = WSAGetLastError()) != WSA_IO_PENDING)
    printf("[%s] PostQueuedCompletionStatus error: %d\r\n", err);
}
}


< IOCP 에 소켓등록해제 - unregister socket from IOCP Handle >

void unregisterSocket(int socketNum)
{
     IOContext *context = (IOContext *)contextList.search(socketNum);
     closesocket(socketNum);
     if (WaitForSingleObject(context->hCloseEvent, 1000*5) == WAIT_TIMEOUT) {
         printf("CloseEvent Wait Timeout !!!\n");
     }
     contextList.remove(context);
}


< Worker Thread >

unsigned int __stdcall WorkerThread(LPVOID lpParam)
{
    BOOL bSuccess = FALSE;
    DWORD dwIoSize;
    LPOVERLAPPED lpOverlapped = NULL;
    IOContext *context;

    while (1)
    {
        bSuccess = GetQueuedCompletionStatus(hCompletionPort, &dwIoSize, (PULONG_PTR)&context, &lpOverlapped, INFINITE);

if (context->bClosed) continue;

  if (!bSuccess) {
    err = GetLastError();
    if (err == WSA_OPERATION_ABORTED) {
                context->bClosed = TRUE;
SetEvent(context->hCloseEvent);         
            } else if (err == WSAENOTSOCK) {
                context->bClosed = TRUE;
                SetEvent(context->hCloseEvent);
            }
            continue;
        }

        // DO SOMETHING
        ......
  struct sockaddr_in fromAddress;
  int len = sizeof(context->buf);
  int addressSize = sizeof(fromAddress);
  int bytesRead;
  DWORD flag = 0;
  int err;

  context->wsaBuf.len = sizeof(buf);
  context->wsaBuf.buf = context->buf;
  if (WSARecvFrom(context->socket, &context->wsaBuf, 1, (LPDWORD)&bytesRead, (LPDWORD)&flag, (struct sockaddr*)&fromAddress, (socklen_t *)&addressSize, &context->wsaOverlapped, NULL) == SOCKET_ERROR) {
    if ((err=WSAGetLastError()) != WSA_IO_PENDING) {
      printf("[%s] WSARecvFrom error:%d, sock:%d, bytesRead:%d\r\n", __FUNCTION__, err, context->socket, bytesRead);
    }

            if (err == WSAENOTSOCK) { // invalid socket (Socket operation on nonsocket.)
                context->bClosed = TRUE;
                SetEvent(context->hCloseEvent);
            }
  }
    }
    return 0;
}


댓글 없음:

댓글 쓰기