2011년 10월 18일 화요일

윈도우에서 IOCP 를 사용해야하는 이유

네트웍 프로그램에서 대량의 트래픽이나 클라이언트/서버를 상대로 데이터를 송수신할때 보통 단일쓰레드로 select 함수를 사용해서 처리한다.
리눅스에서는 이것이 보통 합당한 방식인데 왜냐하면 멀티코어 cpu 상에서 cpu 사용률을 보면 각 cpu가 골고루 사용되는것을 볼수 있다. 이것은 이더넷 카드의 인터럽트가 각 cpu 에 골고루 신호를 보내기때문이다.
하지만 윈도우에서 같은 방식으로 싱글쓰레드 select 함수를 사용하면 cpu 1개가 100% 로 치솟고 나머지 cpu 들은 거의 0% 로 놀고있는 현상을 볼수있다. 이것은 이더넷 카드의 인터럽트가 쿼드코어기준으로 4개의 cpu 에 골고루 인터럽트를 보내지않고 1개의 cpu 에게만 집중적으로 보내기 때문이다. 이것을 막기위해 레지스트리 수정등 여러방법을 써봤지만 다 효과가 없었고 오직 IOCP 기술을 사용해서 부하분산을 할 수 있었다.
IO Completion Port 는 일종의 IO 큐로써 등록된 각 소켓 디스크립터들의 이벤트들을 담고있고 초기에 사용자가 만들었던 쓰레드풀에서 각 쓰레드들이 랜덤하게 큐의 이벤트들을 pop 해서 가져가서 처리하는 방식이다.
IO Completion Port 가 일종의 select 함수 역할을 하는 것이고 이벤트에 대한 처리는 각 쓰레드들이 알아서 하는 방식이다. 이렇게 하면 쓰레드들이 골고루 일을 하기때문에 cpu 부하분산을 시킬수 있다.

2011년 10월 14일 금요일

IOCP로 UDP 데이터 수신 - how to use IOCP WSARecvFrom


< 변수들 - variables >

// IOCP 컨텍스트 정의
class IOContext {
WSAOVERLAPPED wsaOverlapped;
int socket;
WSABUF wsaBuf;
char buf[10240];
HANDLE hCompletionPort;
    HANDLE hCloseEvent;
    BOOL bClosed;

    IOContext(int sock);
    ~IOContext();

};


IOContext::IOContext(int sock)
{
socket = sock;
wsaBuf.len = sizeof(buf);
wsaBuf.buf = buf;
SecureZeroMemory((PVOID)&wsaOverlapped, sizeof(WSAOVERLAPPED));

hCloseEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
bClosed = FALSE;
}

IOContext::~IOContext()
{
CloseHandle(hCloseEvent);
}


List<IOContext> contextList;    // IOCP 컨텍스트들 링크드 리스트로 관리
HANDLE hCompletionPort;
SYSTEM_INFO systemInfo;
DWORD dwThreadCount;
HANDLE hThreads[16];


< IOCP 초기화 및 쓰레드 생성 - initialize IOCP Handle and create worker threads >

void init()
{

    hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    GetSystemInfo(&systemInfo);
    dwThreadCount = systemInfo.dwNumberOfProcessors*2;
 
    start();
}

void start()
{
    for (int i=0; i < dwThreadCount; i++)
    {
  hThreads[i] = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, this, 0, NULL);
    }
}

< IOCP 에 소켓등록 - register socket to IOCP Handle >

void registerSocket(int socketNum)
{
     IOContext *context = new IOContext(socketNum);

     contextList.insert(context);
     context->hCompletionPort = CreateIoCompletionPort((HANDLE)socketNum, hCompletionPort, (ULONG_PTR)context, 0);

int err;
if (!PostQueuedCompletionStatus(hCompletionPort, 0, (ULONG_PTR)context, &context->wsaOverlapped)) {
  if ((err = WSAGetLastError()) != WSA_IO_PENDING)
    printf("[%s] PostQueuedCompletionStatus error: %d\r\n", err);
}
}


< IOCP 에 소켓등록해제 - unregister socket from IOCP Handle >

void unregisterSocket(int socketNum)
{
     IOContext *context = (IOContext *)contextList.search(socketNum);
     closesocket(socketNum);
     if (WaitForSingleObject(context->hCloseEvent, 1000*5) == WAIT_TIMEOUT) {
         printf("CloseEvent Wait Timeout !!!\n");
     }
     contextList.remove(context);
}


< Worker Thread >

unsigned int __stdcall WorkerThread(LPVOID lpParam)
{
    BOOL bSuccess = FALSE;
    DWORD dwIoSize;
    LPOVERLAPPED lpOverlapped = NULL;
    IOContext *context;

    while (1)
    {
        bSuccess = GetQueuedCompletionStatus(hCompletionPort, &dwIoSize, (PULONG_PTR)&context, &lpOverlapped, INFINITE);

if (context->bClosed) continue;

  if (!bSuccess) {
    err = GetLastError();
    if (err == WSA_OPERATION_ABORTED) {
                context->bClosed = TRUE;
SetEvent(context->hCloseEvent);         
            } else if (err == WSAENOTSOCK) {
                context->bClosed = TRUE;
                SetEvent(context->hCloseEvent);
            }
            continue;
        }

        // DO SOMETHING
        ......
  struct sockaddr_in fromAddress;
  int len = sizeof(context->buf);
  int addressSize = sizeof(fromAddress);
  int bytesRead;
  DWORD flag = 0;
  int err;

  context->wsaBuf.len = sizeof(buf);
  context->wsaBuf.buf = context->buf;
  if (WSARecvFrom(context->socket, &context->wsaBuf, 1, (LPDWORD)&bytesRead, (LPDWORD)&flag, (struct sockaddr*)&fromAddress, (socklen_t *)&addressSize, &context->wsaOverlapped, NULL) == SOCKET_ERROR) {
    if ((err=WSAGetLastError()) != WSA_IO_PENDING) {
      printf("[%s] WSARecvFrom error:%d, sock:%d, bytesRead:%d\r\n", __FUNCTION__, err, context->socket, bytesRead);
    }

            if (err == WSAENOTSOCK) { // invalid socket (Socket operation on nonsocket.)
                context->bClosed = TRUE;
                SetEvent(context->hCloseEvent);
            }
  }
    }
    return 0;
}


2011년 10월 11일 화요일

MySQL 접속권한 주기

mysql> GRANT ALL ON new_aces_server.* TO root@'172.16.1.15' IDENTIFIED BY 'acest';