네트웍 프로그램에서 대량의 트래픽이나 클라이언트/서버를 상대로 데이터를 송수신할때 보통 단일쓰레드로 select 함수를 사용해서 처리한다.
리눅스에서는 이것이 보통 합당한 방식인데 왜냐하면 멀티코어 cpu 상에서 cpu 사용률을 보면 각 cpu가 골고루 사용되는것을 볼수 있다. 이것은 이더넷 카드의 인터럽트가 각 cpu 에 골고루 신호를 보내기때문이다.
하지만 윈도우에서 같은 방식으로 싱글쓰레드 select 함수를 사용하면 cpu 1개가 100% 로 치솟고 나머지 cpu 들은 거의 0% 로 놀고있는 현상을 볼수있다. 이것은 이더넷 카드의 인터럽트가 쿼드코어기준으로 4개의 cpu 에 골고루 인터럽트를 보내지않고 1개의 cpu 에게만 집중적으로 보내기 때문이다. 이것을 막기위해 레지스트리 수정등 여러방법을 써봤지만 다 효과가 없었고 오직 IOCP 기술을 사용해서 부하분산을 할 수 있었다.
IO Completion Port 는 일종의 IO 큐로써 등록된 각 소켓 디스크립터들의 이벤트들을 담고있고 초기에 사용자가 만들었던 쓰레드풀에서 각 쓰레드들이 랜덤하게 큐의 이벤트들을 pop 해서 가져가서 처리하는 방식이다.
IO Completion Port 가 일종의 select 함수 역할을 하는 것이고 이벤트에 대한 처리는 각 쓰레드들이 알아서 하는 방식이다. 이렇게 하면 쓰레드들이 골고루 일을 하기때문에 cpu 부하분산을 시킬수 있다.
2011년 10월 18일 화요일
2011년 10월 14일 금요일
IOCP로 UDP 데이터 수신 - how to use IOCP WSARecvFrom
< 변수들 - variables >
// IOCP 컨텍스트 정의
class IOContext {
WSAOVERLAPPED wsaOverlapped;
int socket;
WSABUF wsaBuf;
char buf[10240];
HANDLE hCompletionPort;
HANDLE hCloseEvent;
BOOL bClosed;
IOContext(int sock);
~IOContext();
};
IOContext::IOContext(int sock)
{
socket = sock;
wsaBuf.len = sizeof(buf);
wsaBuf.buf = buf;
SecureZeroMemory((PVOID)&wsaOverlapped, sizeof(WSAOVERLAPPED));
hCloseEvent = CreateEvent(NULL, FALSE, FALSE, NULL);
bClosed = FALSE;
}
IOContext::~IOContext()
{
CloseHandle(hCloseEvent);
}
List<IOContext> contextList; // IOCP 컨텍스트들 링크드 리스트로 관리
HANDLE hCompletionPort;
SYSTEM_INFO systemInfo;
DWORD dwThreadCount;
HANDLE hThreads[16];
< IOCP 초기화 및 쓰레드 생성 - initialize IOCP Handle and create worker threads >
void init()
{
hCompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
GetSystemInfo(&systemInfo);
dwThreadCount = systemInfo.dwNumberOfProcessors*2;
start();
}
void start()
{
for (int i=0; i < dwThreadCount; i++)
{
hThreads[i] = (HANDLE)_beginthreadex(NULL, 0, WorkerThread, this, 0, NULL);
}
}
< IOCP 에 소켓등록 - register socket to IOCP Handle >
void registerSocket(int socketNum)
{
IOContext *context = new IOContext(socketNum);
contextList.insert(context);
context->hCompletionPort = CreateIoCompletionPort((HANDLE)socketNum, hCompletionPort, (ULONG_PTR)context, 0);
int err;
if (!PostQueuedCompletionStatus(hCompletionPort, 0, (ULONG_PTR)context, &context->wsaOverlapped)) {
if ((err = WSAGetLastError()) != WSA_IO_PENDING)
printf("[%s] PostQueuedCompletionStatus error: %d\r\n", err);
}
}
< IOCP 에 소켓등록해제 - unregister socket from IOCP Handle >
void unregisterSocket(int socketNum)
{
IOContext *context = (IOContext *)contextList.search(socketNum);
closesocket(socketNum);
if (WaitForSingleObject(context->hCloseEvent, 1000*5) == WAIT_TIMEOUT) {
printf("CloseEvent Wait Timeout !!!\n");
}
contextList.remove(context);
IOContext *context = (IOContext *)contextList.search(socketNum);
closesocket(socketNum);
if (WaitForSingleObject(context->hCloseEvent, 1000*5) == WAIT_TIMEOUT) {
printf("CloseEvent Wait Timeout !!!\n");
}
contextList.remove(context);
}
< Worker Thread >
unsigned int __stdcall WorkerThread(LPVOID lpParam)
{
BOOL bSuccess = FALSE;
DWORD dwIoSize;
LPOVERLAPPED lpOverlapped = NULL;
IOContext *context;
while (1)
{
bSuccess = GetQueuedCompletionStatus(hCompletionPort, &dwIoSize, (PULONG_PTR)&context, &lpOverlapped, INFINITE);
if (context->bClosed) continue;
if (context->bClosed) continue;
if (!bSuccess) {
err = GetLastError();
if (err == WSA_OPERATION_ABORTED) {
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
} else if (err == WSAENOTSOCK) {
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
continue;
}
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
} else if (err == WSAENOTSOCK) {
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
continue;
}
// DO SOMETHING
......
struct sockaddr_in fromAddress;
int len = sizeof(context->buf);
int addressSize = sizeof(fromAddress);
int bytesRead;
DWORD flag = 0;
int err;
context->wsaBuf.len = sizeof(buf);
context->wsaBuf.buf = context->buf;
if (WSARecvFrom(context->socket, &context->wsaBuf, 1, (LPDWORD)&bytesRead, (LPDWORD)&flag, (struct sockaddr*)&fromAddress, (socklen_t *)&addressSize, &context->wsaOverlapped, NULL) == SOCKET_ERROR) {
if ((err=WSAGetLastError()) != WSA_IO_PENDING) {
printf("[%s] WSARecvFrom error:%d, sock:%d, bytesRead:%d\r\n", __FUNCTION__, err, context->socket, bytesRead);
}
if (err == WSAENOTSOCK) { // invalid socket (Socket operation on nonsocket.)
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
if (err == WSAENOTSOCK) { // invalid socket (Socket operation on nonsocket.)
context->bClosed = TRUE;
SetEvent(context->hCloseEvent);
}
}
}
return 0;
return 0;
}
2011년 10월 11일 화요일
MySQL 접속권한 주기
mysql> GRANT ALL ON new_aces_server.* TO root@'172.16.1.15' IDENTIFIED BY 'acest';
피드 구독하기:
글 (Atom)