ctor: ACE_Reactor (ACE_Reactor_Impl *implementation = 0,int delete_implementation = 0);
你可以自己创建一个ACE_Reactor
但是大多数时候,我们都是通过调用ACE_Reactor::instance()这个静态方法来返回唯一的实例。
ACE_Reactor
*
ACE_Reactor::instance( void )
{
ACE_TRACE( " ACE_Reactor::instance " );
if (ACE_Reactor::reactor_ == 0 ) //1
{
// PerformDouble-CheckedLockingOptimization.
ACE_MT(ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,ace_mon,
* ACE_Static_Object_Lock::instance(), 0 ));
if (ACE_Reactor::reactor_ == 0 ) //2
{
ACE_NEW_RETURN(ACE_Reactor::reactor_,
ACE_Reactor,
0 );
ACE_Reactor::delete_reactor_ = 1 ;
ACE_REGISTER_FRAMEWORK_COMPONENT(ACE_Reactor,ACE_Reactor::reactor_)
}
}
return ACE_Reactor::reactor_;
}
注意这点使用了双检测的机制(代码1和2),为了提高效率,不用每次都加锁。
ACE_Reactor::instance( void )
{
ACE_TRACE( " ACE_Reactor::instance " );
if (ACE_Reactor::reactor_ == 0 ) //1
{
// PerformDouble-CheckedLockingOptimization.
ACE_MT(ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex,ace_mon,
* ACE_Static_Object_Lock::instance(), 0 ));
if (ACE_Reactor::reactor_ == 0 ) //2
{
ACE_NEW_RETURN(ACE_Reactor::reactor_,
ACE_Reactor,
0 );
ACE_Reactor::delete_reactor_ = 1 ;
ACE_REGISTER_FRAMEWORK_COMPONENT(ACE_Reactor,ACE_Reactor::reactor_)
}
}
return ACE_Reactor::reactor_;
}
2. Impl手法的运用
ACE_Reactor有个成员变量 ACE_Reactor_Impl *implementation_;
这个implementation_才是真正做事情的东西,典型的Impl手法。
为什么要多一个这个间隔层呢,主要是为了实现跨平台。
因为不同的平台的Reactor差异很大。
在Windows平台,实现这个是ACE_WFMO_Reactor
class
ACE_ExportACE_WFMO_Reactor:
public
ACE_Reactor_Impl
3. Event_Handle的管理
ACE_WFMO_Reactor把要管理的Handle都放在 ACE_WFMO_Reactor_Handler_Repository handler_rep_;
这里要注意的是io_handle和event_handle的区别
io_handle是真正的handle,比如socket_handle,thread_handle
而event_handle是绑定在io_handle上面的事件handle
有代码为证:
1
int
2 ACE_WFMO_Reactor::register_handler_i(ACE_HANDLEevent_handle,
3 ACE_HANDLEio_handle,
4 ACE_Event_Handler * event_handler,
5 ACE_Reactor_Masknew_masks)
6 {
7 // IfthisisaWinsock1system,theunderlyingeventassignmentwill
8 // notwork,sodon'ttry.Winsock1mustuseACE_Select_Reactorfor
9 // reactingtosocketactivity.
10
11 // Makesurethatthe<handle>isvalid
12 if (io_handle == ACE_INVALID_HANDLE)
13 io_handle = event_handler -> get_handle();
14
15 if ( this -> handler_rep_.invalid_handle(io_handle))
16 {
17 errno = ERROR_INVALID_HANDLE;
18 return - 1 ;
19 }
20
21 long new_network_events = 0 ;
22 int delete_event = 0 ;
23 auto_ptr < ACE_Auto_Event > event ;
24
25 // Lookuptherepositorytoseeifthe<event_handler>isalready
26 // there.
27 ACE_Reactor_Maskold_masks;
28 int found = this -> handler_rep_.modify_network_events_i(io_handle,
29 new_masks,
30 old_masks,
31 new_network_events,
32 event_handle,
33 delete_event,
34 ACE_Reactor::ADD_MASK);
35
36 // Checktoseeiftheuserpassedusavalidevent;Ifnotthenwe
37 // needtocreateone
38 if (event_handle == ACE_INVALID_HANDLE)
39 {
40 // Note:don'tchangethissincesomeC++compilershave
41 // <auto_ptr>sthatdon'tworkproperly
42 auto_ptr < ACE_Auto_Event > tmp( new ACE_Auto_Event);
43 event = tmp;
44 event_handle = event -> handle();
45 delete_event = 1 ;
46 }
47
48 int result = ::WSAEventSelect((SOCKET)io_handle,
49 event_handle,
50 new_network_events);
2 ACE_WFMO_Reactor::register_handler_i(ACE_HANDLEevent_handle,
3 ACE_HANDLEio_handle,
4 ACE_Event_Handler * event_handler,
5 ACE_Reactor_Masknew_masks)
6 {
7 // IfthisisaWinsock1system,theunderlyingeventassignmentwill
8 // notwork,sodon'ttry.Winsock1mustuseACE_Select_Reactorfor
9 // reactingtosocketactivity.
10
11 // Makesurethatthe<handle>isvalid
12 if (io_handle == ACE_INVALID_HANDLE)
13 io_handle = event_handler -> get_handle();
14
15 if ( this -> handler_rep_.invalid_handle(io_handle))
16 {
17 errno = ERROR_INVALID_HANDLE;
18 return - 1 ;
19 }
20
21 long new_network_events = 0 ;
22 int delete_event = 0 ;
23 auto_ptr < ACE_Auto_Event > event ;
24
25 // Lookuptherepositorytoseeifthe<event_handler>isalready
26 // there.
27 ACE_Reactor_Maskold_masks;
28 int found = this -> handler_rep_.modify_network_events_i(io_handle,
29 new_masks,
30 old_masks,
31 new_network_events,
32 event_handle,
33 delete_event,
34 ACE_Reactor::ADD_MASK);
35
36 // Checktoseeiftheuserpassedusavalidevent;Ifnotthenwe
37 // needtocreateone
38 if (event_handle == ACE_INVALID_HANDLE)
39 {
40 // Note:don'tchangethissincesomeC++compilershave
41 // <auto_ptr>sthatdon'tworkproperly
42 auto_ptr < ACE_Auto_Event > tmp( new ACE_Auto_Event);
43 event = tmp;
44 event_handle = event -> handle();
45 delete_event = 1 ;
46 }
47
48 int result = ::WSAEventSelect((SOCKET)io_handle,
49 event_handle,
50 new_network_events);
可以看到在42行create event,在44复制到event_handle,最后通过 WSAEventSelect将这个io_handle和event_handle绑定在一起了
这个register_handle调用一般都在Event_Handler的open函数中,实现了注册到Reactor中去。
4. 等待事件
如何检测到要发生的socket的事件呢?比如有新的client连接,收发。ACE并不直接调用select函数来取得。
ACE调用 WaitForMultipleObjectsEx来等待事件的发生。ACE这样做的好处是不但可以捕捉socket事件,也可以捕捉到其他事件。前面说过了每一个对应的socket都有一个event_handle与之对应。 WaitForMultipleObjectsEx会将发生事件的socket handle的index返回。这样ACE_Reactor就可以利用这个slot来查到io_handle和event_handler( 注意:event_handle和event_handler是不同的)
1
DWORD
2 ACE_WFMO_Reactor::wait_for_multiple_events( int timeout,
3 int alertable)
4 {
5 // Waitforanyofhandles_tobeactive,oruntiltimeoutexpires.
6 // If<alertable>isenabledallowasynchronouscompletionof
7 // ReadFileandWriteFileoperations.
8
9 return ::WaitForMultipleObjectsEx( this -> handler_rep_.max_handlep1(),
10 this -> handler_rep_.handles(),
11 FALSE,
12 timeout,
13 alertable);
14 }
2 ACE_WFMO_Reactor::wait_for_multiple_events( int timeout,
3 int alertable)
4 {
5 // Waitforanyofhandles_tobeactive,oruntiltimeoutexpires.
6 // If<alertable>isenabledallowasynchronouscompletionof
7 // ReadFileandWriteFileoperations.
8
9 return ::WaitForMultipleObjectsEx( this -> handler_rep_.max_handlep1(),
10 this -> handler_rep_.handles(),
11 FALSE,
12 timeout,
13 alertable);
14 }
5.分发事件
根据WaitForMultiObjectEx返回的slot就可以查询到event_handler来调用用户的处理函数了
1
int
2 ACE_WFMO_Reactor::complex_dispatch_handler(DWORDslot,
3 ACE_HANDLEevent_handle)
4 {
5 // ThisdispatchisusedforI/Oentires.
6
7 ACE_WFMO_Reactor_Handler_Repository::Current_Info & current_info =
8 this -> handler_rep_.current_info()[slot];
9
10 WSANETWORKEVENTSevents;
11 ACE_Reactor_Maskproblems = ACE_Event_Handler::NULL_MASK;
12 if (::WSAEnumNetworkEvents((SOCKET)current_info.io_handle_,
13 event_handle,
14 & events) == SOCKET_ERROR)
15 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
16 else
17 {
18 // Prepareforupcalls.Clearthebitsfrom<events>representing
19 // eventsthehandlerisnotinterestedin.Ifthereareanyleft,
20 // dotheupcall(s).upcallwillreplaceevents.lNetworkEvents
21 // withbitsrepresentinganyfunctionsthatrequestedarepeat
22 // callbackbeforecheckinghandlesagain.Inthiscase,continue
23 // tocallbackunlessthehandlerisunregisteredasaresultof
24 // oneoftheupcalls.Thewaythisiswritten,theupcallswill
25 // keepbeingdoneevenifoneormoreupcallsreportedproblems.
26 // Inpracticethismayturnoutnotsogood,butlet'ssee.Ifany
27 // problems,pleasenotifySteveHuston<shuston@riverace.com>
28 // beforeorafteryouchangethiscode.
29 events.lNetworkEvents &= current_info.network_events_;
30 while (events.lNetworkEvents != 0 )
31 {
32 ACE_Event_Handler * event_handler =
33 current_info.event_handler_;
34
35 int reference_counting_required =
36 event_handler -> reference_counting_policy().value() ==
37 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
38
39 // Calladd_reference()ifneeded.
40 if (reference_counting_required)
41 {
42 event_handler -> add_reference();
43 }
44
45 // Upcall
46 problems |= this -> upcall(current_info.event_handler_,
47 current_info.io_handle_,
48 events);
49
50 // Callremove_reference()ifneeded.
51 if (reference_counting_required)
52 {
53 event_handler -> remove_reference();
54 }
55
56 if ( this -> handler_rep_.scheduled_for_deletion(slot))
57 break ;
58 }
59 }
60
61 if (problems != ACE_Event_Handler::NULL_MASK
62 && ! this -> handler_rep_.scheduled_for_deletion(slot))
63 this -> handler_rep_.unbind(event_handle,problems);
64
65 return 0 ;
66 }
这里值得注意的是ACE通过调用WSAEnumNetworkEvents来重置event_handle。 原文链接:https://www.f2er.com/react/308289.html2 ACE_WFMO_Reactor::complex_dispatch_handler(DWORDslot,
3 ACE_HANDLEevent_handle)
4 {
5 // ThisdispatchisusedforI/Oentires.
6
7 ACE_WFMO_Reactor_Handler_Repository::Current_Info & current_info =
8 this -> handler_rep_.current_info()[slot];
9
10 WSANETWORKEVENTSevents;
11 ACE_Reactor_Maskproblems = ACE_Event_Handler::NULL_MASK;
12 if (::WSAEnumNetworkEvents((SOCKET)current_info.io_handle_,
13 event_handle,
14 & events) == SOCKET_ERROR)
15 problems = ACE_Event_Handler::ALL_EVENTS_MASK;
16 else
17 {
18 // Prepareforupcalls.Clearthebitsfrom<events>representing
19 // eventsthehandlerisnotinterestedin.Ifthereareanyleft,
20 // dotheupcall(s).upcallwillreplaceevents.lNetworkEvents
21 // withbitsrepresentinganyfunctionsthatrequestedarepeat
22 // callbackbeforecheckinghandlesagain.Inthiscase,continue
23 // tocallbackunlessthehandlerisunregisteredasaresultof
24 // oneoftheupcalls.Thewaythisiswritten,theupcallswill
25 // keepbeingdoneevenifoneormoreupcallsreportedproblems.
26 // Inpracticethismayturnoutnotsogood,butlet'ssee.Ifany
27 // problems,pleasenotifySteveHuston<shuston@riverace.com>
28 // beforeorafteryouchangethiscode.
29 events.lNetworkEvents &= current_info.network_events_;
30 while (events.lNetworkEvents != 0 )
31 {
32 ACE_Event_Handler * event_handler =
33 current_info.event_handler_;
34
35 int reference_counting_required =
36 event_handler -> reference_counting_policy().value() ==
37 ACE_Event_Handler::Reference_Counting_Policy::ENABLED;
38
39 // Calladd_reference()ifneeded.
40 if (reference_counting_required)
41 {
42 event_handler -> add_reference();
43 }
44
45 // Upcall
46 problems |= this -> upcall(current_info.event_handler_,
47 current_info.io_handle_,
48 events);
49
50 // Callremove_reference()ifneeded.
51 if (reference_counting_required)
52 {
53 event_handler -> remove_reference();
54 }
55
56 if ( this -> handler_rep_.scheduled_for_deletion(slot))
57 break ;
58 }
59 }
60
61 if (problems != ACE_Event_Handler::NULL_MASK
62 && ! this -> handler_rep_.scheduled_for_deletion(slot))
63 this -> handler_rep_.unbind(event_handle,problems);
64
65 return 0 ;
66 }