源码可以到http://www.aoc.nrao.edu/php/tjuerges/ALMA/ACE-5.5.2/html/ace/上去找
虽然ACE_Select_Reactor是灵活的,但是由于只有拥有者才能调用它的handle_events方法,所以在多线程应用程序中它还是有一定的局限性的,因而ACE_Select_Reactor得事件多路分离层的序列化处理可能对某些场景来讲约束太多了。一种解决方法是多个线程去运行独立ACE_Select_Reactor的实例,但是这样需要开发人员去实现一个代理在各反应器之间均衡的分配事件处理,以便达到均衡负载。另一种即这里的ACE_TP_Reactor,基于线程池的Reactor。
其实现了Learder/Follower模型,从而提供了一种有效的并发性模型,多个线程可以针对一组I/O句柄轮流调用select去检测并处理事件。
最关键的地方在于:
1.分派事件(其实就是该线程自己执行事件处理函数)前,会释放Token,让其他线程能够去select或者处理还未处理的事件
2.获取到token的线程,会先检测是否有未处理的事件,若有则先处理,若没有则占用token进行select阻塞检测事件
这里的keypoint1,可以从如下代码中看出来:
int ACE_TP_Reactor::handle_socket_events ( int & event_count,ACE_TP_Token_Guard & g
)
00466 {
00467
00468 // We got the lock,lets handle some I/O events.
00469 ACE_EH_Dispatch_Info dispatch_info;
00470
00471 this->get_socket_event_info (dispatch_info);
00472
00473 // If there is any event handler that is ready to be dispatched,the
00474 // dispatch information is recorded in dispatch_info.
00475 if (!dispatch_info.dispatch ())
00476 {
00477 return 0;
00478 }
00479
00480 // Suspend the handler so that other threads don't start dispatching
00481 // it,if we can't suspend then return directly
00482 //
00483 // NOTE: This check was performed in older versions of the
00484 // TP_Reactor. Looks like it is a waste..
00485 if (dispatch_info.event_handler_ != this->notify_handler_)
00486 if (this->suspend_i (dispatch_info.handle_) == -1)
00487 return 0;
00488
00489 // Call add_reference() if needed.
00490 if (dispatch_info.reference_counting_required_)
00491 dispatch_info.event_handler_->add_reference ();
00492
00493 // Release the lock. Others threads can start waiting.
00494 guard.release_token ();
00495
00496 int result = 0;
00497
00498 // If there was an event handler ready,dispatch it.
00499 // Decrement the event left
00500 --event_count;
00501
00502 // Dispatched an event
00503 if (this->dispatch_socket_event (dispatch_info) == 0)
00504 ++result;
00505
00506 return result;
00507 }
上述的第一点,也即是这里的:
00493 // Release the lock. Others threads can start waiting. 00494 guard.release_token ();
说明里也讲的很清楚,会将token释放让其他线程去等待。
那么keypoint2,又是如何完成的了呢?看下面的代码:
int ACE_TP_Reactor::get_event_for_dispatching ( ACE_Time_Value * max_wait_time )
00511 {
00512 // If the reactor handler state has changed,clear any remembered
00513 // ready bits and re-scan from the master wait_set.
.............
00534 return this->wait_for_multiple_events (this->ready_set_,00535 max_wait_time);
00536 }
//看下wait_for_multiple_events的实现
template<class ACE_SELECT_REACTOR_TOKEN>
int ACE_Select_Reactor_T< ACE_SELECT_REACTOR_TOKEN >::wait_for_multiple_events ( ACE_Select_Reactor_Handle_Set &,ACE_Time_Value *
)
01074 {
01075 ACE_TRACE ("ACE_Select_Reactor_T::wait_for_multiple_events");
01076 u_long width = 0;
01077 ACE_Time_Value timer_buf (0);
01078 ACE_Time_Value *this_timeout;
01079
01080 int number_of_active_handles = this->any_ready (dispatch_set);
01081
01082 // If there are any bits enabled in the <ready_set_> then we'll
01083 // handle those first,otherwise we'll block in <select>.
01084
01085 if (number_of_active_handles == 0)
01086 {
01087 do
01088 {
01089 this_timeout =
01090 this->timer_queue_->calculate_timeout (max_wait_time,01091 &timer_buf);
01092 width = (u_long) this->handler_rep_.max_handlep1 ();
01093
01094 dispatch_set.rd_mask_ = this->wait_set_.rd_mask_;
01095 dispatch_set.wr_mask_ = this->wait_set_.wr_mask_;
01096 dispatch_set.ex_mask_ = this->wait_set_.ex_mask_;
01097 number_of_active_handles = ACE_OS::select (int (width),01098 dispatch_set.rd_mask_,01099 dispatch_set.wr_mask_,01100 dispatch_set.ex_mask_,01101 this_timeout);
01102 }
..........
这里的实现ACE_TP_Reactor是直接使用的ACE_Select_Reactor_T模板类的,但是其处理也是这样的,先检测是否已经有触发的事件any_ready ,若有则不再去执行select。
Perform Double-Checked Locking Optimization.?