2015년 11월 26일 목요일

live555 TaskScheduler 수정소스

live555 의 TaskScheduler 를 사용하기 쉽게 수정한 소스이다. 윈도우/리눅스 둘 다 적용가능하며 소켓통신 프로그램 개발시 간편하게 사용할 수 있다.
Mutex/Thread 관련 소스는 http://greenday96.blogspot.com/2015/08/blog-post.html 포스트 참조

< NetCommon.h >
 #ifndef __NETCOMMON_H__  
 #define __NETCOMMON_H__  
   
 #ifdef WIN32  
   
 #include <WinSock2.h>  
 #include <ws2tcpip.h>  
   
 #define closeSocket     closesocket  
 #define EWOULDBLOCK     WSAEWOULDBLOCK  
 #define EINPROGRESS WSAEWOULDBLOCK  
 #define EINTR          WSAEINTR  
   
 #define _strcasecmp     _strnicmp  
 #define snprintf     _snprintf  
   
 #else  
   
 #include <sys/socket.h>  
 #include <netinet/in.h>  
 #include <netinet/tcp.h>  
 #include <arpa/inet.h>  
 #include <unistd.h>  
 #include <fcntl.h>  
 #include <errno.h>  
   
 #define closeSocket               close  
 #define WSAGetLastError()     errno  
   
 #include <ctype.h>  
 #include <stdlib.h>  
 #define _strcasecmp strncasecmp  
 #endif  
   
 #endif  
   


< TaskScheduler.h >
 #ifndef __TASK_SCHEDULER_H__  
 #define __TASK_SCHEDULER_H__  
   
 #include "NetCommon.h"  
 #include "Mutex.h"  
 #include "Thread.h"  
   
 #define SOCKET_READABLE  (1<<1)  
 #define SOCKET_WRITABLE  (1<<2)  
 #define SOCKET_EXCEPTION  (1<<3)  
   
 class HandlerSet;  
   
 class TaskScheduler   
 {  
 public:       
      TaskScheduler();  
      virtual ~TaskScheduler();  
   
      typedef void BackgroundHandlerProc(void* clientData, int mask);  
   
      void turnOnBackgroundReadHandling(int socketNum, BackgroundHandlerProc* handlerProc, void *clientData);  
      void turnOffBackgroundReadHandling(int socketNum);       
   
      int startEventLoop();  
      void stopEventLoop();  
      void doEventLoop();  
   
      int isRunning() { return fTaskLoop; }  
   
 protected:            
      virtual void SingleStep();  
      void taskLock();  
      void taskUnlock();  
   
 protected:  
      int                         fTaskLoop;  
      MUTEX                    fMutex;  
      THREAD                    fThread;  
   
      HandlerSet     *fReadHandlers;  
      int               fLastHandledSocketNum;  
   
      int          fMaxNumSockets;  
      fd_set     fReadSet;  
 };  
   
 class HandlerDescriptor {  
      HandlerDescriptor(HandlerDescriptor* nextHandler);  
      virtual ~HandlerDescriptor();  
   
 public:  
      int socketNum;  
      TaskScheduler::BackgroundHandlerProc* handlerProc;  
      void* clientData;  
   
 private:  
      // Descriptors are linked together in a doubly-linked list:  
      friend class HandlerSet;  
      friend class HandlerIterator;  
      HandlerDescriptor* fNextHandler;  
      HandlerDescriptor* fPrevHandler;  
 };  
   
 class HandlerSet {  
 public:  
      HandlerSet();  
      virtual ~HandlerSet();  
   
      void assignHandler(int socketNum, TaskScheduler::BackgroundHandlerProc* handlerProc, void* clientData);  
      void removeHandler(int socketNum);  
      void moveHandler(int oldSocketNum, int newSocketNum);  
   
 private:  
      HandlerDescriptor* lookupHandler(int socketNum);  
   
 private:  
      friend class HandlerIterator;  
      HandlerDescriptor fHandlers;  
 };  
   
 class HandlerIterator {  
 public:  
      HandlerIterator(HandlerSet& handlerSet);  
      virtual ~HandlerIterator();  
   
      HandlerDescriptor* next(); // returns NULL if none  
      void reset();  
   
 private:  
      HandlerSet& fOurSet;  
      HandlerDescriptor* fNextPtr;  
 };  
   
 #endif  
   


< TaskScheduler.cpp >
 #include "TaskScheduler.h"  
 #include "RTSPCommonEnv.h"  
 #include <stdio.h>  
   
 THREAD_FUNC DoEventThread(void* lpParam)  
 {  
      TaskScheduler *scheduler = (TaskScheduler *)lpParam;  
      scheduler->doEventLoop();  
      return 0;  
 }  
   
 TaskScheduler::TaskScheduler()  
 {  
      fTaskLoop = 0;  
      MUTEX_INIT(&fMutex);  
      FD_ZERO(&fReadSet);  
      fMaxNumSockets = 0;  
      fThread = NULL;  
      fReadHandlers = new HandlerSet();  
 }  
   
 TaskScheduler::~TaskScheduler()  
 {  
      stopEventLoop();  
   
      delete fReadHandlers;  
   
      THREAD_DESTROY(&fThread);  
   
      MUTEX_DESTROY(&fMutex);  
 }  
   
 void TaskScheduler::taskLock()  
 {  
      MUTEX_LOCK(&fMutex);  
 }  
   
 void TaskScheduler::taskUnlock()  
 {  
      MUTEX_UNLOCK(&fMutex);  
 }  
   
 void TaskScheduler::turnOnBackgroundReadHandling(int socketNum, BackgroundHandlerProc* handlerProc, void *clientData)   
 {  
	taskLock();

	if (socketNum < 0) goto exit;

	FD_SET((unsigned)socketNum, &fReadSet);
	fReadHandlers->assignHandler(socketNum, handlerProc, clientData);

	if (socketNum+1 > fMaxNumSockets) {
		fMaxNumSockets = socketNum+1;
	}

exit:
	taskUnlock();
 }  
   
 void TaskScheduler::turnOffBackgroundReadHandling(int socketNum)   
 {  
	taskLock();

	if (socketNum < 0) goto exit;

	FD_CLR((unsigned)socketNum, &fReadSet);
	fReadHandlers->removeHandler(socketNum);

    HandlerIterator iter(*fReadHandlers);
	HandlerDescriptor* handler;

	int maxSocketNum = 0;
	while ((handler = iter.next()) != NULL) {
		if (handler->socketNum+1 > maxSocketNum) {
			maxSocketNum = handler->socketNum+1;
		}
	}
	fMaxNumSockets = maxSocketNum;	

exit:
	taskUnlock();
 }  
   
 int TaskScheduler::startEventLoop()  
 {  
      if (fTaskLoop != 0)  
           return -1;  
   
      fTaskLoop = 1;  
      THREAD_CREATE(&fThread, DoEventThread, this);  
      if (!fThread) {  
           DPRINTF("failed to create event loop thread\n");  
           fTaskLoop = 0;  
           return -1;  
      }  
   
      return 0;  
 }  
   
 void TaskScheduler::stopEventLoop()  
 {  
      fTaskLoop = 0;  
   
      THREAD_JOIN(&fThread);  
      THREAD_DESTROY(&fThread);  
 }  
   
 void TaskScheduler::doEventLoop()   
 {  
      while (fTaskLoop)  
      {  
           SingleStep();  
      }  
 }  
   
 void TaskScheduler::SingleStep()  
 {  
	taskLock();

	fd_set readSet = fReadSet;

	struct timeval timeout;
	timeout.tv_sec = 1;
	timeout.tv_usec = 0;

	int selectResult = select(fMaxNumSockets, &readSet, NULL, NULL, &timeout);
	if (selectResult < 0) {
		int err = WSAGetLastError();
		DPRINTF("TaskScheduler::SingleStep(): select() fails : %d\n", err);
		taskUnlock();
		return;
	}

	HandlerIterator iter(*fReadHandlers);
	HandlerDescriptor* handler;

	while ((handler = iter.next()) != NULL) {
		if (FD_ISSET(handler->socketNum, &readSet) && handler->handlerProc != NULL) {
			(*handler->handlerProc)(handler->clientData, SOCKET_READABLE);
		}
	}

	taskUnlock();
 }  
   
   
 HandlerDescriptor::HandlerDescriptor(HandlerDescriptor* nextHandler)  
 : handlerProc(NULL) {  
      // Link this descriptor into a doubly-linked list:  
      if (nextHandler == this) { // initialization  
           fNextHandler = fPrevHandler = this;  
      } else {  
           fNextHandler = nextHandler;  
           fPrevHandler = nextHandler->fPrevHandler;  
           nextHandler->fPrevHandler = this;  
           fPrevHandler->fNextHandler = this;  
      }  
 }  
   
 HandlerDescriptor::~HandlerDescriptor() {  
      // Unlink this descriptor from a doubly-linked list:  
      fNextHandler->fPrevHandler = fPrevHandler;  
      fPrevHandler->fNextHandler = fNextHandler;  
 }  
   
 HandlerSet::HandlerSet()  
 : fHandlers(&fHandlers) {  
      fHandlers.socketNum = -1; // shouldn't ever get looked at, but in case...  
 }  
   
 HandlerSet::~HandlerSet() {  
      // Delete each handler descriptor:  
      while (fHandlers.fNextHandler != &fHandlers) {  
           delete fHandlers.fNextHandler; // changes fHandlers->fNextHandler  
      }  
 }  
   
 void HandlerSet  
 ::assignHandler(int socketNum, TaskScheduler::BackgroundHandlerProc* handlerProc, void* clientData) {  
      // First, see if there's already a handler for this socket:  
      HandlerDescriptor* handler = lookupHandler(socketNum);  
      if (handler == NULL) { // No existing handler, so create a new descr:  
           handler = new HandlerDescriptor(fHandlers.fNextHandler);  
           handler->socketNum = socketNum;  
      }  
   
      handler->handlerProc = handlerProc;  
      handler->clientData = clientData;  
 }  
   
 void HandlerSet::removeHandler(int socketNum) {  
      HandlerDescriptor* handler = lookupHandler(socketNum);  
      delete handler;  
 }  
   
 void HandlerSet::moveHandler(int oldSocketNum, int newSocketNum) {  
      HandlerDescriptor* handler = lookupHandler(oldSocketNum);  
      if (handler != NULL) {  
           handler->socketNum = newSocketNum;  
      }  
 }  
   
 HandlerDescriptor* HandlerSet::lookupHandler(int socketNum) {  
      HandlerDescriptor* handler;  
      HandlerIterator iter(*this);  
      while ((handler = iter.next()) != NULL) {  
           if (handler->socketNum == socketNum) break;  
      }  
      return handler;  
 }  
   
 HandlerIterator::HandlerIterator(HandlerSet& handlerSet)  
 : fOurSet(handlerSet) {  
      reset();  
 }  
   
 HandlerIterator::~HandlerIterator() {  
 }  
   
 void HandlerIterator::reset() {  
      fNextPtr = fOurSet.fHandlers.fNextHandler;  
 }  
   
 HandlerDescriptor* HandlerIterator::next() {  
      HandlerDescriptor* result = fNextPtr;  
      if (result == &fOurSet.fHandlers) { // no more  
           result = NULL;  
      } else {  
           fNextPtr = fNextPtr->fNextHandler;  
      }  
   
      return result;  
 }  
   


< TaskScheduler 사용 >

TaskScheduler* fTask = new TaskScheduler();
...
fTask->turnOnBackgroundReadHandling(sock, &incomingHandler, this); // 소켓 핸들러 등록
...
fTask->turnOffBackgroundReadHandling(sock); // 소켓 핸들러 해제
delete fTask;

static void incomingHandler(void *data, int)
{
    recvfrom();
    ...
}

댓글 없음:

댓글 쓰기