在ACE 6.0.0 里提供了一个Reactor示例程序,对其分析一下(在代码的注释里)。
Reactors_Test.cpp文件(在 \ACE-6.0.0\ACE_wrappers\tests\ 目录下):
// $Id: Reactors_Test.cpp 91671 2010-09-08 18:39:23Z johnnyw $ // ============================================================================ // // = LIBRARY // tests // // = FILENAME // Reactors_Test.cpp // // = DESCRIPTION // This is a test that performs a torture test of multiple // <ACE_Reactors> and <ACE_Tasks> in the same process. // // = AUTHOR // Prashant Jain <pjain@cs.wustl.edu>,// Detlef Becker <Detlef.Becker@med.siemens.de>,and // Douglas C. Schmidt <schmidt@cs.wustl.edu> // // ============================================================================ #include "test_config.h" #include "ace/Task.h" #include "ace/Reactor.h" #include "ace/Atomic_Op.h" #include "ace/Recursive_Thread_Mutex.h" #if defined (ACE_HAS_THREADS) ACE_Thread_Manager *thr_mgr; static const int MAX_TASKS = 20; class Test_Task : public ACE_Task<ACE_MT_SYNCH> // = TITLE // Exercise the tasks. { public: // = Initialization and termination methods. Test_Task (void); ~Test_Task (void); //FUZZ: disable check_for_lack_ACE_OS // = Task hooks. virtual int open (void *args = 0); virtual int close (u_long flags = 0); virtual int svc (void); //FUZZ: enable check_for_lack_ACE_OS // = Event Handler hooks. virtual int handle_input (ACE_HANDLE handle); virtual int handle_close (ACE_HANDLE fd,ACE_Reactor_Mask close_mask); private: size_t handled_; // Number of iterations handled. static int task_count_; // Number of tasks running. }; // Static data member initialization. int Test_Task::task_count_ = 0; static ACE_Atomic_Op<ACE_Thread_Mutex,int> done_count = MAX_TASKS * 2; static ACE_Recursive_Thread_Mutex recursive_lock; Test_Task::Test_Task (void) : handled_ (0) { ACE_GUARD (ACE_Recursive_Thread_Mutex,ace_mon,recursive_lock); Test_Task::task_count_++; ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) TT+ Test_Task::task_count_ = %d\n"),Test_Task::task_count_)); } Test_Task::~Test_Task (void) { ACE_GUARD (ACE_Recursive_Thread_Mutex,recursive_lock); ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) TT- Test_Task::task_count_ = %d\n"),Test_Task::task_count_)); ACE_ASSERT (Test_Task::task_count_ == 0); } int Test_Task::open (void *args) { this->reactor (reinterpret_cast<ACE_Reactor *> (args)); return this->activate (THR_NEW_LWP); // 只有调用了activate方法后,才能自动执行后面的svc方法。 } int Test_Task::close (u_long) { ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,recursive_lock,-1); Test_Task::task_count_--; ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) close Test_Task::task_count_ = %d\n"),Test_Task::task_count_)); ACE_ASSERT (Test_Task::task_count_ >= 0); return 0; } int Test_Task::svc (void) // 在上面调用了activate方法后,任务自动执行svc方法。 { ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) svc\n"))); for (size_t i = 0; i < ACE_MAX_ITERATIONS; i++) { ACE_OS::thr_yield (); // Only wait up to 10 milliseconds to notify the Reactor. ACE_Time_Value timeout (0,10 * 1000); // 调用 Reactor 的 notify 方法向 Reactor 发送通知,后面的 handle_input 和 handle_close 方法在 Reactor 处理通知(在下面)时被回调。 if (this->reactor ()->notify (this,ACE_Event_Handler::READ_MASK,&timeout) == -1) { if (errno == ETIME) ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) %p\n"),ACE_TEXT ("notify() timed out"))); else ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("notify")),-1); } } return 0; } int Test_Task::handle_close (ACE_HANDLE,ACE_Reactor_Mask) { return 0; } int Test_Task::handle_input (ACE_HANDLE) { this->handled_++; if (this->handled_ == ACE_MAX_ITERATIONS) { done_count--; ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) handle_input,handled_ = %d,done_count = %d\n"),this->handled_,done_count.value ())); } ACE_OS::thr_yield (); return -1; } static void * worker (void *args) { ACE_Reactor *reactor = reinterpret_cast<ACE_Reactor *> (args); // Make this thread the owner of the Reactor's event loop. reactor->owner (ACE_Thread::self ()); // Use a timeout to inform the Reactor when to shutdown. ACE_Time_Value timeout (4); for (;;) switch (reactor->handle_events (timeout)) // 调用 Reactor 的 handle_events 方法,循环处理任务发来的通知(在上面),回调任务的 handle_input 和 handle_close 方法。 { case -1: ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("reactor")),0); /* NOTREACHED */ case 0: ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) Reactor shutdown\n"))); return 0; } ACE_NOTREACHED (return 0); } #endif /* ACE_HAS_THREADS */ int run_main (int,ACE_TCHAR *[]) { ACE_START_TEST (ACE_TEXT ("Reactors_Test")); #if defined (ACE_HAS_THREADS) ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); thr_mgr = ACE_Thread_Manager::instance (); ACE_Reactor reactor; ACE_ASSERT (ACE_LOG_MSG->op_status () != -1); Test_Task tt1[MAX_TASKS]; Test_Task tt2[MAX_TASKS]; // Activate all of the Tasks. for (int i = 0; i < MAX_TASKS; i++) { tt1[i].open (ACE_Reactor::instance ()); // 给任务关联Reactor。 tt2[i].open (&reactor); // 给任务关联Reactor。 } //创建两线程,分别监视两个Reactor。 // Spawn two threads each running a different reactor. if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (worker),(void *) ACE_Reactor::instance (),THR_BOUND | THR_DETACHED) == -1) ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("%p\n"),ACE_TEXT ("spawn")),-1); else if (ACE_Thread_Manager::instance ()->spawn (ACE_THR_FUNC (worker),(void *) &reactor,-1); // 等待所有线程结束。 if (ACE_Thread_Manager::instance ()->wait () == -1) ACE_ERROR_RETURN ((LM_ERROR,ACE_TEXT ("wait")),-1); ACE_DEBUG ((LM_DEBUG,ACE_TEXT ("(%t) all threads are finished\n"))); #else ACE_ERROR ((LM_INFO,ACE_TEXT ("threads not supported on this platform\n"))); #endif /* ACE_HAS_THREADS */ ACE_END_TEST; return 0; }
在run_main函数里写上下面输出语句,看看 ACE_Reactor::instance() 和 &reactor 是否相等。
printf("ACE_Reactor::instance() = %d,&reactor = %d,reactor.instance() = %d\n",ACE_Reactor::instance(),&reactor,reactor.instance());
运行发现:ACE_Reactor::instance() 和 reactor.instance() 相等,但与 &reactor 不相等。
查看ACE源代码,得知应该是下面原因。
ACE_Reactor 类里有 reactor_ 静态成员:
/// Pointer to a process-wide ACE_Reactor singleton.(指向一个进程内的ACE_Reactor单例)
static ACE_Reactor *reactor_;
但ACE_Reactor的构造函数里并没有对 reactor_ 如下赋值:
reactor_ = this;
所以 ACE_Reactor reactor; 语句没有把 &reactor 传入类中。
原文链接:/react/308137.html