Mutex/Thread 관련 소스는 http://greenday96.blogspot.com/2015/08/blog-post.html 포스트 참조
< NetCommon.h >
#ifndef __NETCOMMON_H__
#define __NETCOMMON_H__
#ifdef WIN32
#include <WinSock2.h>
#include <ws2tcpip.h>
#define closeSocket closesocket
#define EWOULDBLOCK WSAEWOULDBLOCK
#define EINPROGRESS WSAEWOULDBLOCK
#define EINTR WSAEINTR
#define _strcasecmp _strnicmp
#define snprintf _snprintf
#else
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#define closeSocket close
#define WSAGetLastError() errno
#include <ctype.h>
#include <stdlib.h>
#define _strcasecmp strncasecmp
#endif
#endif
< TaskScheduler.h >
#ifndef __TASK_SCHEDULER_H__
#define __TASK_SCHEDULER_H__
#include "NetCommon.h"
#include "Mutex.h"
#include "Thread.h"
#define SOCKET_READABLE (1<<1)
#define SOCKET_WRITABLE (1<<2)
#define SOCKET_EXCEPTION (1<<3)
class HandlerSet;
class TaskScheduler
{
public:
TaskScheduler();
virtual ~TaskScheduler();
typedef void BackgroundHandlerProc(void* clientData, int mask);
void turnOnBackgroundReadHandling(int socketNum, BackgroundHandlerProc* handlerProc, void *clientData);
void turnOffBackgroundReadHandling(int socketNum);
int startEventLoop();
void stopEventLoop();
void doEventLoop();
int isRunning() { return fTaskLoop; }
protected:
virtual void SingleStep();
void taskLock();
void taskUnlock();
protected:
int fTaskLoop;
MUTEX fMutex;
THREAD fThread;
HandlerSet *fReadHandlers;
int fLastHandledSocketNum;
int fMaxNumSockets;
fd_set fReadSet;
};
class HandlerDescriptor {
HandlerDescriptor(HandlerDescriptor* nextHandler);
virtual ~HandlerDescriptor();
public:
int socketNum;
TaskScheduler::BackgroundHandlerProc* handlerProc;
void* clientData;
private:
// Descriptors are linked together in a doubly-linked list:
friend class HandlerSet;
friend class HandlerIterator;
HandlerDescriptor* fNextHandler;
HandlerDescriptor* fPrevHandler;
};
class HandlerSet {
public:
HandlerSet();
virtual ~HandlerSet();
void assignHandler(int socketNum, TaskScheduler::BackgroundHandlerProc* handlerProc, void* clientData);
void removeHandler(int socketNum);
void moveHandler(int oldSocketNum, int newSocketNum);
private:
HandlerDescriptor* lookupHandler(int socketNum);
private:
friend class HandlerIterator;
HandlerDescriptor fHandlers;
};
class HandlerIterator {
public:
HandlerIterator(HandlerSet& handlerSet);
virtual ~HandlerIterator();
HandlerDescriptor* next(); // returns NULL if none
void reset();
private:
HandlerSet& fOurSet;
HandlerDescriptor* fNextPtr;
};
#endif
< TaskScheduler.cpp >
#include "TaskScheduler.h"
#include "RTSPCommonEnv.h"
#include <stdio.h>
THREAD_FUNC DoEventThread(void* lpParam)
{
TaskScheduler *scheduler = (TaskScheduler *)lpParam;
scheduler->doEventLoop();
return 0;
}
TaskScheduler::TaskScheduler()
{
fTaskLoop = 0;
MUTEX_INIT(&fMutex);
FD_ZERO(&fReadSet);
fMaxNumSockets = 0;
fThread = NULL;
fReadHandlers = new HandlerSet();
}
TaskScheduler::~TaskScheduler()
{
stopEventLoop();
delete fReadHandlers;
THREAD_DESTROY(&fThread);
MUTEX_DESTROY(&fMutex);
}
void TaskScheduler::taskLock()
{
MUTEX_LOCK(&fMutex);
}
void TaskScheduler::taskUnlock()
{
MUTEX_UNLOCK(&fMutex);
}
void TaskScheduler::turnOnBackgroundReadHandling(int socketNum, BackgroundHandlerProc* handlerProc, void *clientData)
{
taskLock();
if (socketNum < 0) goto exit;
FD_SET((unsigned)socketNum, &fReadSet);
fReadHandlers->assignHandler(socketNum, handlerProc, clientData);
if (socketNum+1 > fMaxNumSockets) {
fMaxNumSockets = socketNum+1;
}
exit:
taskUnlock();
}
void TaskScheduler::turnOffBackgroundReadHandling(int socketNum)
{
taskLock();
if (socketNum < 0) goto exit;
FD_CLR((unsigned)socketNum, &fReadSet);
fReadHandlers->removeHandler(socketNum);
HandlerIterator iter(*fReadHandlers);
HandlerDescriptor* handler;
int maxSocketNum = 0;
while ((handler = iter.next()) != NULL) {
if (handler->socketNum+1 > maxSocketNum) {
maxSocketNum = handler->socketNum+1;
}
}
fMaxNumSockets = maxSocketNum;
exit:
taskUnlock();
}
int TaskScheduler::startEventLoop()
{
if (fTaskLoop != 0)
return -1;
fTaskLoop = 1;
THREAD_CREATE(&fThread, DoEventThread, this);
if (!fThread) {
DPRINTF("failed to create event loop thread\n");
fTaskLoop = 0;
return -1;
}
return 0;
}
void TaskScheduler::stopEventLoop()
{
fTaskLoop = 0;
THREAD_JOIN(&fThread);
THREAD_DESTROY(&fThread);
}
void TaskScheduler::doEventLoop()
{
while (fTaskLoop)
{
SingleStep();
}
}
void TaskScheduler::SingleStep()
{
taskLock();
fd_set readSet = fReadSet;
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
int selectResult = select(fMaxNumSockets, &readSet, NULL, NULL, &timeout);
if (selectResult < 0) {
int err = WSAGetLastError();
DPRINTF("TaskScheduler::SingleStep(): select() fails : %d\n", err);
taskUnlock();
return;
}
HandlerIterator iter(*fReadHandlers);
HandlerDescriptor* handler;
while ((handler = iter.next()) != NULL) {
if (FD_ISSET(handler->socketNum, &readSet) && handler->handlerProc != NULL) {
(*handler->handlerProc)(handler->clientData, SOCKET_READABLE);
}
}
taskUnlock();
}
HandlerDescriptor::HandlerDescriptor(HandlerDescriptor* nextHandler)
: handlerProc(NULL) {
// Link this descriptor into a doubly-linked list:
if (nextHandler == this) { // initialization
fNextHandler = fPrevHandler = this;
} else {
fNextHandler = nextHandler;
fPrevHandler = nextHandler->fPrevHandler;
nextHandler->fPrevHandler = this;
fPrevHandler->fNextHandler = this;
}
}
HandlerDescriptor::~HandlerDescriptor() {
// Unlink this descriptor from a doubly-linked list:
fNextHandler->fPrevHandler = fPrevHandler;
fPrevHandler->fNextHandler = fNextHandler;
}
HandlerSet::HandlerSet()
: fHandlers(&fHandlers) {
fHandlers.socketNum = -1; // shouldn't ever get looked at, but in case...
}
HandlerSet::~HandlerSet() {
// Delete each handler descriptor:
while (fHandlers.fNextHandler != &fHandlers) {
delete fHandlers.fNextHandler; // changes fHandlers->fNextHandler
}
}
void HandlerSet
::assignHandler(int socketNum, TaskScheduler::BackgroundHandlerProc* handlerProc, void* clientData) {
// First, see if there's already a handler for this socket:
HandlerDescriptor* handler = lookupHandler(socketNum);
if (handler == NULL) { // No existing handler, so create a new descr:
handler = new HandlerDescriptor(fHandlers.fNextHandler);
handler->socketNum = socketNum;
}
handler->handlerProc = handlerProc;
handler->clientData = clientData;
}
void HandlerSet::removeHandler(int socketNum) {
HandlerDescriptor* handler = lookupHandler(socketNum);
delete handler;
}
void HandlerSet::moveHandler(int oldSocketNum, int newSocketNum) {
HandlerDescriptor* handler = lookupHandler(oldSocketNum);
if (handler != NULL) {
handler->socketNum = newSocketNum;
}
}
HandlerDescriptor* HandlerSet::lookupHandler(int socketNum) {
HandlerDescriptor* handler;
HandlerIterator iter(*this);
while ((handler = iter.next()) != NULL) {
if (handler->socketNum == socketNum) break;
}
return handler;
}
HandlerIterator::HandlerIterator(HandlerSet& handlerSet)
: fOurSet(handlerSet) {
reset();
}
HandlerIterator::~HandlerIterator() {
}
void HandlerIterator::reset() {
fNextPtr = fOurSet.fHandlers.fNextHandler;
}
HandlerDescriptor* HandlerIterator::next() {
HandlerDescriptor* result = fNextPtr;
if (result == &fOurSet.fHandlers) { // no more
result = NULL;
} else {
fNextPtr = fNextPtr->fNextHandler;
}
return result;
}
< TaskScheduler 사용 >
TaskScheduler* fTask = new TaskScheduler();
...
fTask->turnOnBackgroundReadHandling(sock, &incomingHandler, this); // 소켓 핸들러 등록
...
fTask->turnOffBackgroundReadHandling(sock); // 소켓 핸들러 해제
delete fTask;
static void incomingHandler(void *data, int)
{
recvfrom();
...
}
댓글 없음:
댓글 쓰기