在Addison-Wesley - C++NetworkProgrammingVol2的4.3 The ACE_TP_Reactor Class中有这样几句话,让我足足想了一天。
1.Multiple threads running an ACE_TP_Reactor event loop can process events concurrently on different handles
2.They can also dispatch timeout and I/O callback methods concurrently on the same event handler
3.The only serialization in the ACE_TP_Reactor occurs when I/O events occur concurrently on the same handle
4.After a thread obtains a set of active handles from select(),the other reactor threads dispatch from that handle set instead of calling select() again
其实上面的3句话主要表达这样的几层含义
1.并行处理不同handle上的事件
2.并行处理同一event handler上的超时回调函数和I/O回调函数
3.多个线程串行处理同一个handle上的I/O事件。
4.虽然TP_Reactor是用leader/follow模式轮流调用select,但是如果一个select()获得了多个激活的handle,那么其他的线程会分发这些handle ,而不是去再次调用select.
(这点还没有想通,也没有看见是如何实现的?)
“多个线程串行处理同一个handle上的I/O事件”
这个是如何达到的呢?ACE源码中,当处理I/O事件的时候,会将 HANDLE挂起,使得不再对该 HANDLE做事件侦听。来达到同一个 handle上的I/O事件是被多个线程串行地处理。
"并行处理同一event handler上的超时回掉函数和I/O回调函数"
这样好像就比较麻烦了。因为这就意味着TP_Reactor只保证同一个handle下不会有多线程同时调用I/O事件,但是却有可能同时调用超时回调函 数和I/O回调函数。如果在这两个函数中有对数据的访问和操作,这就意味着需要有锁的引入。例外,如果在定时器处理中,超过定时的事件间隔,就会有令一个 线程再次调用定时器的处理函数,一下子引入了很多同步的问题。如何解决这个问题呢?
方法一:
更改ACE的源码,象处理socket事件一样,在处理定时事件的时候,也把HANDLE挂起。来自http://cpunion.cnblogs.com/archive/2005/08/09/210941.html
int
ACE_TP_Reactor::handle_timer_events(&/*event_count*/,
ACE_TP_Token_Guardguard)
{
//Getthecurrenttime
ACE_Time_Valuecur_time(this->timer_queue_gettimeofday()+timer_skew());
Lookforanodeinthetimerqueuewhosetimer<=thepresent
time.ACE_Timer_Node_Dispatch_Infoinfo;
if(dispatch_info(cur_time,
info))
{
********fixedbylijie***********(info.type_get_handle()!=ACE_INVALID_HANDLE)
{
is_suspended_i(info.type_get_handle()))
return0;
suspend_handler(info.type_get_handle());
}
********end**********************constvoid*upcall_act=Preinvoke.preinvoke(info,
cur_time,
upcall_act);
Releasethetokenbeforedispatchingnotifiesguard.release_token();
callthefunctorupcall(info,
cur_time);
Postinvokepostinvoke(info,
upcall_act);
Wehavedispatchedatimer1;
}
;
}
ACE_TP_Reactor::handle_timer_events(&/*event_count*/,
ACE_TP_Token_Guardguard)
{
//Getthecurrenttime
ACE_Time_Valuecur_time(this->timer_queue_gettimeofday()+timer_skew());
Lookforanodeinthetimerqueuewhosetimer<=thepresent
time.ACE_Timer_Node_Dispatch_Infoinfo;
if(dispatch_info(cur_time,
info))
{
********fixedbylijie***********(info.type_get_handle()!=ACE_INVALID_HANDLE)
{
is_suspended_i(info.type_get_handle()))
return0;
suspend_handler(info.type_get_handle());
}
********end**********************constvoid*upcall_act=Preinvoke.preinvoke(info,
cur_time,
upcall_act);
Releasethetokenbeforedispatchingnotifiesguard.release_token();
callthefunctorupcall(info,
cur_time);
Postinvokepostinvoke(info,
upcall_act);
Wehavedispatchedatimer1;
}
;
}
handle_timer处理完以后,返回以前,加上这句话
this->reactor ()->resume_handler (this->get_handle ());
当然别忘了为Handler编写get_handle()函数:
ACE_HANDLETest_Handler::get_handle()
{
peer().get_handle();
}
{
peer().get_handle();
}