event处理流程
Libevent处理时间的大概流程为
1、设置event_base(即初始化Reactor)
2、设置event事件(初始化event)
3、将event和event_base关联(将event注册到event_base)
4、进入循环,等待事件
5、事件发生,处理事件。
用UML序列图可以表示为:
#include <iostream>
#include <sys/time.h>
#include <event.h>
struct event ev;
struct timeval tv;
void time_cb(int fd,short event,void* argc)
{
std::cout<<"Timer wakeup"<<std::endl;
event_add(&ev,&tv);
}
int main()
{
struct event_base* base=event_base_new();
tv.tv_sec=10;
tv.tv_usec=0;
evtimer_set(&ev,time_cb,NULL);//设置定时器事件
event_base_set(base,&ev);//将event和event_base关联
event_add(&ev,&tv);//注册事件
event_base_dispatch(base);//进入loop循环,等待事件
return 0;
}
首先创建了event_base作为Reactor,设置了定时器事件作为event。随后将ev和base关联,注册ev到事件分发器(evsel,在event_base中)。最后进入loop循环,等待事件发生。
看一下event_base_set(base,&ev)
int event_base_set(struct event_base *base,struct event *ev)
{
/* Only innocent events may be assigned to a different base */
if (ev->ev_flags != EVLIST_INIT)//确保event已经初始化
return (-1);
_event_debug_assert_is_setup(ev);
ev->ev_base = base;//设置event_base为event的Reactor
ev->ev_pri = base->nactivequeues/2;//event的优先级
return (0);
}
可以看出这一步只是将event和event_base关联,即在event中保存它所在的Reactor。
再来看看event_add(&ev,&tv)
int event_add(struct event *ev,const struct timeval *tv)
{
int res;
if (EVUTIL_FAILURE_CHECK(!ev->ev_base)) {
event_warnx("%s: event has no event_base set.",__func__);
return -1;
}
EVBASE_ACQUIRE_LOCK(ev->ev_base,th_base_lock);
res = event_add_internal(ev,tv,0);//这里才是主体部分
EVBASE_RELEASE_LOCK(ev->ev_base,th_base_lock);
return (res);
}
再来看一下event_add_internal(ev,0),略去了部分代码
static inline int
event_add_internal(struct event *ev,const struct timeval *tv,int tv_is_absolute)
{
struct event_base *base = ev->ev_base;
int res = 0;
int notify = 0;
if (tv != NULL && !(ev->ev_flags & EVLIST_TIMEOUT)) {//tv不为空,且要添加到time堆中,则为time堆扩容
if (min_heap_reserve(&base->timeheap,1 + min_heap_size(&base->timeheap)) == -1)
return (-1); /* ENOMEM == errno */
}
/*如果事件是IO或信号事件,且事件已经添加或激活,则插入到相应队列 */
if ((ev->ev_events & (EV_READ|EV_WRITE|EV_SIGNAL)) &&
!(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) {
if (ev->ev_events & (EV_READ|EV_WRITE))//IO事件
res = evmap_io_add(base,ev->ev_fd,ev);//添加到IO事件队列
else if (ev->ev_events & EV_SIGNAL)//信号事件
res = evmap_signal_add(base,(int)ev->ev_fd,ev);//添加到信号事件队列
if (res != -1)
event_queue_insert(base,ev,EVLIST_INSERTED);//将事件插入到队列base->eventqueue
if (res == 1) {
/* evmap says we need to notify the main thread. */
notify = 1;
res = 0;
}
}
gettime(base,&now);//更新base中的time
event_queue_insert(base,EVLIST_TIMEOUT);//添加到定时器小根堆
/* if we are not in the right thread,we need to wake up the loop */
if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))//如果当前线程不是loop所在线程,唤醒loop线程。
evthread_notify_base(base);
return (res);
}
最后看一下event_base_dispatch(base);
int event_base_dispatch(struct event_base *event_base)
{
return (event_base_loop(event_base,0));
}
看一下event_base_loop(event_base,0)
int
event_base_loop(struct event_base *base,int flags)
{
const struct eventop *evsel = base->evsel;//事件分发器 event demultiplexer
struct timeval tv;
struct timeval *tv_p;
int res,done,retval = 0;
if (base->running_loop) {//loop循环只能在event_base所在线程
event_warnx("%s: reentrant invocation. Only one event_base_loop"
" can run on each event_base at once.",__func__);
EVBASE_RELEASE_LOCK(base,th_base_lock);
return -1;
}
base->running_loop = 1;//表明even_base在运行loop,防止其他线程运行
clear_time_cache(base);//置零tv_cache
if (base->sig.ev_signal_added && base->sig.ev_n_signals_added)//如果信号事件设置
evsig_set_base(base);//设置signal_pair[0]位信号源
done = 0;
base->event_gotterm = base->event_break = 0;
while (!done) {//进入loop循环主体
base->event_continue = 0;
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
break;
}
if (base->event_break) {
break;
}
timeout_correct(base,&tv);//更新时间
tv_p = &tv;
if (!N_ACTIVE_CALLBACKS(base) && !(flags & EVLOOP_NONBLOCK)) {
timeout_next(base,&tv_p);
} else {
/*
* if we have active events,we just poll new events
* without waiting.
*/
evutil_timerclear(&tv);
}
/* If we have no events,we just exit */
if (!event_haveevents(base) && !N_ACTIVE_CALLBACKS(base)) {
event_debug(("%s: no events registered.",__func__));
retval = 1;
goto done;//用了goto?
}
/* update last old time */
gettime(base,&base->event_tv);
clear_time_cache(base);
res = evsel->dispatch(base,tv_p);//epoll_wait等,阻塞等待事件
if (res == -1) {
event_debug(("%s: dispatch returned unsuccessfully.",__func__));
retval = -1;
goto done;
}
update_time_cache(base);
timeout_process(base);//处理到时事件
if (N_ACTIVE_CALLBACKS(base)) {
int n = event_process_active(base);//处理事件,处理过程中有优先级
if ((flags & EVLOOP_ONCE)
&& N_ACTIVE_CALLBACKS(base) == 0
&& n != 0)
done = 1;
} else if (flags & EVLOOP_NONBLOCK)
done = 1;
}
done://goto用的label
clear_time_cache(base);
base->running_loop = 0;
EVBASE_RELEASE_LOCK(base,th_base_lock);
return (retval);
}