Reactor事件驱动模式 一,什么是Reactor模式? 1.它的中文名叫反应器,是事件驱动中的一种(还有另外一种叫Proactor:主动器) 2.在我们写一些比较小的[服务器-客户端]的时候,经常简单地使用BSD Socket(accept,connect,recv,send等等), 但是这样的话,程序的[Socket处理逻辑]和[业务逻辑]就混合在一起,导致2者的耦合性非常高,Reactor模式正是为了 解决这种耦合性而诞生的 3.Reactor(反应器)是相对于调用者而言的,因为调用之后,当有事件,Reactor就会通知我们,我们就去响应事件。 二,Reactor模式的实现(C++): (1).实现类简介: 1.多路复用组件接口:IReactor 2.具体多路复用组件:CSelectReactor(在Windows是用select实现,在Linux可以用epoll等等) 3.事件组件接口:IEvent(事件是和Socket关联在一起的) 4.具体事件组件:CAcceptEvent,CReadEvent (2).实现思路: 1.定义一个具体的多路复用组件,该组件负责: A.多路复用事件 B.注册一个事件到多路复用事件列表中 C.当有事件发生,判断事件的类型(分别有:Read,Write),然后调用分别调用事件的Read事件或者Write事件 2.定义事件,事件都是继承于IEvent接口类,并且提供2个接口: A.ReadEvent接口,当有Read事件的时候,该接口会被调用 B.WriteEvent接口,当有Write事件的时候,该接口被调用 (3).实现: A.多路复用组件接口(IReactor): /////////////类声明/////////////// typedef map<SOCKET,IEvent*> SOCKET_MAP_DEF;//以Socket为映射的IEvent*列表 typedef map<SOCKET,IEvent*>::iterator SOCKET_MAP_ITER_DEF; class IReactor { public: virtual int Init() = 0;//初始化 virtual int Dispatch() = 0;//多路复用Dispatch int RegisterEvent(IEvent* pEvent);//注册一个Event IReactor(); protected: SOCKET_MAP_DEF cEventMap; }; /////////////类实现/////////////// IReactor::IReactor() { WORD wVersionRequested; WSADATA wsaData; int err; wVersionRequested = MAKEWORD( 2,2 ); err = WSAStartup( wVersionRequested,&wsaData ); if ( err != 0 ) { return; } if ( LOBYTE( wsaData.wVersion ) != 2 || HIBYTE( wsaData.wVersion ) != 2 ) { WSACleanup( ); return; } } int IReactor::RegisterEvent(IEvent* pEvent) { cEventMap.insert(make_pair(pEvent->GetSocket(),pEvent));//注册事件 return TRUE; } B.具体多路复用组件(CSelectReactor): /////////////类声明/////////////// class CSelectReactor : public IReactor//继承于IReactor { public: int Init(); int Dispatch(); private: typedef struct { FD_SET stReadFDSet; FD_SET stWriteFDSet; FD_SET stExpFDSet; }SELECT_REACTOR_DATA;//select的3中FD,分别用于select函数 protected: SELECT_REACTOR_DATA stFDSet; }; /////////////类实现/////////////// int CSelectReactor::Init() { return 0; } int CSelectReactor::Dispatch() { while(1) { FD_ZERO(&stFDSet.stReadFDSet); FD_ZERO(&stFDSet.stWriteFDSet); FD_ZERO(&stFDSet.stExpFDSet); if (cEventMap.size() == 0) { break; } //1.根据Event_Type把事件增加到相应的FD中 for (SOCKET_MAP_ITER_DEF i = cEventMap.begin(); i != cEventMap.end(); i++) { switch (i->second->GetEventType()) { case enumEvent_Type_Read: { FD_SET(i->second->GetSocket(),&stFDSet.stReadFDSet); }break; case enumEvent_Type_Write: { FD_SET(i->second->GetSocket(),&stFDSet.stWriteFDSet); }break; } } //2.开始Select int nRet = select(0,&stFDSet.stReadFDSet,&stFDSet.stWriteFDSet,&stFDSet.stExpFDSet,NULL); if (nRet == SOCKET_ERROR)//出错 { break; } else if (nRet == 0)//超时 { } else { for (SOCKET_MAP_ITER_DEF i = cEventMap.begin(); i != cEventMap.end();) { BOOL bIsEvent = FALSE; if (FD_ISSET(i->second->GetSocket(),&stFDSet.stReadFDSet))//有Read事件 { i->second->ReadEvent(); bIsEvent = TRUE; } if (FD_ISSET(i->second->GetSocket(),&stFDSet.stWriteFDSet))//有Write事件 { i->second->WriteEvent(); bIsEvent = TRUE; } if(bIsEvent && i->second->IsEventPersist() == FALSE) { delete i->second;//!!!这里暂时这样做 i = cEventMap.erase(i); } else { i++; } } } } return 0; } C.事件组件接口(IEvent): /////////////类声明/////////////// class IEvent { public: IEvent() { bIsPersist = FALSE; } VOID SetEvent(SOCKET hSocket,EVENT_TYPE enumEventType);//设置一个Event的事件 SOCKET GetSocket(); EVENT_TYPE GetEventType();
VOID SetEventPersist(BOOL bIsPersist) { this->bIsPersist = bIsPersist; }//设置Event是否[持久性] BOOL IsEventPersist() { return bIsPersist == TRUE; } virtual int ReadEvent() = 0;//Read事件接口 virtual int WriteEvent() = 0;//Write事件接口 protected: SOCKET hSocket; EVENT_TYPE enumEventType; BOOL bIsPersist; }; /////////////类实现/////////////// VOID IEvent::SetEvent(SOCKET hSocket,EVENT_TYPE enumEventType) { this->hSocket = hSocket; this->enumEventType = enumEventType; } SOCKET IEvent::GetSocket() { return hSocket; } EVENT_TYPE IEvent::GetEventType() { return enumEventType; } D.具体事件组件(CAcceptEvent,CReadEvent): class CReadEvent : public IEvent { public: int ReadEvent()//Read事件 { char szBuf[1024 * 2] = {0}; int nRet = recv(GetSocket(),szBuf,1024 * 2,0);//从Socket读取数据 if (nRet == -1)//对方关闭Socket { SetEventPersist(FALSE); closesocket(GetSocket()); return 0; } cout<<"ReadBuffer:"<<szBuf<<endl; return 0; } int WriteEvent() { return 0; } }; class CAcceptEvent : public IEvent { public: int ReadEvent() { CReadEvent* pReadEvent = new CReadEvent();//客户端的Event SOCKADDR_IN stClientAddr = {0}; int nLen = sizeof(SOCKADDR_IN); SOCKET hClient = accept(GetSocket(),(SOCKADDR*)&stClientAddr,&nLen);//Accept客户端 pReadEvent->SetEvent(hClient,enumEvent_Type_Read); pReadEvent->SetEventPersist(TRUE); pSelectReactor->RegisterEvent(pReadEvent);//注册到Reactor中 return 0; } int WriteEvent() { cout<<"WriteEvent"<<endl; return 0; } CAcceptEvent(CSelectReactor* pSelectReactor) { this->pSelectReactor = pSelectReactor; } protected: CSelectReactor* pSelectReactor; }; E.Main函数: int main() { CSelectReactor cSelect; CAcceptEvent* pEvent = new CAcceptEvent(&cSelect); SOCKADDR_IN stSockAddr = {0}; stSockAddr.sin_addr.s_addr = INADDR_ANY; stSockAddr.sin_family = AF_INET; stSockAddr.sin_port = htons(9999); SOCKET hSocket = socket(AF_INET,SOCK_STREAM,0); bind(hSocket,(SOCKADDR*)&stSockAddr,sizeof(SOCKADDR)); listen(hSocket,5); pEvent->SetEvent(hSocket,enumEvent_Type_Read);//增加一个Listen的Event pEvent->SetEventPersist(TRUE); cSelect.RegisterEvent(pEvent);//注册到Reacotr中 cSelect.Dispatch();//Select多路复用 return 0; } |