事件等待模式,也就所谓的reactor模式,简单的讲就是通过监听等待io的事件,然后将等待就绪可以处理的事件,分发给对应的io处理程序进行处理,这种模式本身并不算是异步io,但是TBox的asio库,在类unix系统上,也是基于此模型实现的,所以在讲解真正的异步io之前,首先稍微介绍介绍,对asio的底层机制有个大体了解。
在类unix系统,例如linux的epoll,mac的kqueue,以及select、poll,dev/poll等等,都是可以用来实现reactor
虽然功能大体相同,但是效率上来讲, epoll和kqueue 更加的高效,因为他们没有像select,poll那样通过轮询来实现
单从接口设计上来说,kqueue的设计效率更高些,因为它可以一次批量处理多个事件,所以跟系统内核间的交互比较少。但是具体epoll和kqueue那个效率更高,就不好说了。
TBox下的asio底层实现,大体分为两种:
-
@H_403_11@基于reactor模型,通过对epoll,kqueue、poll、select等api的分装实现的aiop接口(也就是本章所要讲的) 在一个独立的线程内,进行事件监听,来分发处理各种异步io事件,也就实现了proactor的模式。
@H_403_11@直接系统原生支持的proactor模型,例如windows的iocp,并在其基础上做一些封装来实现。
这样,对于上层应用来说,用的都是同一套asio的异步回调接口,不需要做什么的改变,但是底层根据不同平台其实现机制,已大不相同,windows上用了iocp,linux和android用了epoll,mac上用了kqueue,ios上用了poll。。
言归正传,本章所要将的aiop接口,就是对reactor的各种poll接口的上层封装,对于一些不需要将就高性能的应用直接使用aiop更加的简洁,易维护。 而基于回调的proactor模式,等下一章再详细讲解。
首先我先描述下几个对象类型:
-
@H_403_11@aiop: 事件等待对象池
@H_403_11@aioo: 等待对象,关联和维护:socket句柄、用户私有数据
@H_403_11@aioe: 事件对象,一个aioo对象一次可以等待多个不同类型的aioe事件对象,例如send,recv,acpt,conn,...
@H_403_11@code: 事件代码
接着我们看下直接使用aiop的简单服务器代码,(这里为了看起来简洁点,资源管理和释放上我就先省略了):
tb_int_t main(tb_int_t argc,tb_char_t** argv) { // 初始化一个tcp的套接字,用于监听 tb_socket_ref_t listen_sock = tb_socket_init(TB_SOCKET_TYPE_TCP); tb_assert_and_check_return_val(sock,0); // 初始化aiop轮询池,对象规模16个socket,可混用,如果传0,则使用默认值 tb_aiop_ref_t aiop = tb_aiop_init(16); tb_assert_and_check_return_val(aiop,0); // 绑定ip和端口,这里ip没做绑定,传null,监听端口为9090 if (!tb_socket_bind(listen_sock,tb_null,9090)) return 0; // 监听socket if (!tb_socket_listen(listen_sock,5)) return 0; // 将这个监听套接字添加到aiop中,并附上accept等待事件 if (!tb_aiop_addo(aiop,listen_sock,TB_AIOE_CODE_ACPT,tb_null)) return 0; // 初始化一个aioe的事件对象列表,用于获取等待返回的事件 tb_aioe_t list[16]; // 开启循环 while (1) { /* 等待事件到来,用到跟epoll,select类似 * * 16: 最大需要等待的事件数量 * -1: 等待超时值,这里为永久等待 * * objn为返回的有效事件数,如果失败返回:-1, 超时返回:0 */ tb_long_t objn = tb_aiop_wait(aiop,list,16,-1); tb_assert_and_check_break(objn >= 0); // 枚举等到的事件列表 tb_size_t i = 0; for (i = 0; i < objn; i++) { // 获取事件对应aioo对象句柄,aioo对socket句柄,事件类型、关联的私有数据进行了同一维护 tb_aioo_ref_t aioo = list[i].aioo; // 获取aioe事件关联的私有数据指针 // 也可以通过tb_aioo_priv(aioo)获取 tb_cpointer_t priv = list[i].priv; // 获取aioo对应的socket句柄 tb_socket_ref_t sock = tb_aioo_sock(aioo); // 有accept事件? if (list[i].code & TB_AIOE_CODE_ACPT) { // 接收对方的连接,返回对应的客户端socket tb_socket_ref_t client_sock = tb_socket_accept(sock,tb_null); tb_assert_and_check_break(client_sock); /* 将客户端的socket也添加到aiop池中,并等待它的recv事件 * * 这里的最后一个参数,可以传一个私有的数据指针,并和sock进行关联 * 用于方便维护每个连接对应的会话数据,这里随便传了个字符串 * * 返回的aioo对象可以保存下来,并在之后可以灵活修改需要等待的事件 * * 注:这里的client_sock需要在自己的应用内,自己做释放,aiop不会去自动释放 * 因为这个实在外围代码自己创建的,这里的例子仅仅为了省事,就直接忽略了。 */ tb_aioo_ref_t aioo = tb_aiop_addo(aiop,client_sock,TB_AIOE_CODE_RECV,"private data"); tb_assert_and_check_break(aioo); } // 有recv事件? else if (list[i].code & TB_AIOE_CODE_RECV) { // 非阻塞接收一段数据 tb_byte_t data[8192]; tb_long_t real = tb_socket_recv(sock,data,sizeof(data)); // 接收完指定数据后,省略部分代码 // ... // 将socket改为等待发送 if (!tb_aiop_sete(aiop,aioo,TB_AIOE_CODE_SEND,tb_null)) break; // 尝试发送数据,也许没发完 tb_socket_send(sock,"hello",sizeof("hello")); } // 有send事件? else if (list[i].code & TB_AIOE_CODE_SEND) { // 继续发送上回没发完的数据 // tb_socket_send(..); // 删除对应aioo事件对象,取消监听这个sock的事件 tb_aiop_delo(aiop,aioo); } // 有连接事件? else if (list[i].code & TB_AIOE_CODE_CONN) { // 此处不会进入,仅仅在tb_socket_connect之后,并且注册了aioo,才会有此事件 // ... } // 错误代码处理 else { tb_trace_e("unknown code: %lu",list[i].code); break; } } } return 0; }
这里的代码,仅仅描述了下aiop的接口调用流程,没有实际的服务器业务逻辑,仅作参考,请不要照搬复制的使用。
代码里提到的aioo对象,还有个单独的等待接口,用于直接等待单个socket对象,一般在basic_stream的wait里面使用:
/*! 等待单个socket句柄的事件 * * @param socket socket句柄 * @param code aioe等待事件代码 * @param timeout 等待超时时间,永久等待传:-1 * * @return > 0: 等到的事件,0: 超时,-1: 失败 */ tb_long_t tb_aioo_wait(tb_socket_ref_t socket,tb_size_t code,tb_long_t timeout);
aiop的接口并不多,我这里不多做描述了,就简单的列举下吧:
/*! 初始化aiop事件等待池 * * @param maxn 等待对象的规模数量 * * @return aiop池 */ tb_aiop_ref_t tb_aiop_init(tb_size_t maxn); /*! 退出aiop * * @param aiop aiop池 */ tb_void_t tb_aiop_exit(tb_aiop_ref_t aiop); /*! 清除所有aiop中的aioo等待对象 * * @param aiop aiop池 */ tb_void_t tb_aiop_cler(tb_aiop_ref_t aiop); /*! 强制退出aiop的等待,tb_aiop_wait会返回:-1,并且退出循环,无法再次等待 * * @param aiop aiop池 */ tb_void_t tb_aiop_kill(tb_aiop_ref_t aiop); /*! 退出aiop的等待,tb_aiop_wait会返回:0,但不退出等待循环,还可继续等待 * * @param aiop aiop池 */ tb_void_t tb_aiop_spak(tb_aiop_ref_t aiop); /*! 添加一个socket等待对象aioo,并且关联等待的事件代码code,以及私有数据priv * * @param aiop aiop池 * @param socket socket句柄 * @param code 等待的事件代码 * @param priv 关联的用户私有数据 * * @return aioo对象 */ tb_aioo_ref_t tb_aiop_addo(tb_aiop_ref_t aiop,tb_socket_ref_t socket,tb_cpointer_t priv); /*! 删除一个aioo等待对象,永远不再等待它 * * @param aiop aiop池 * @param aioo aioo对象句柄 * */ tb_void_t tb_aiop_delo(tb_aiop_ref_t aiop,tb_aioo_ref_t aioo); /*! 投递一个aioe等待事件对象 * * @param aiop aiop池 * @param aioe aioe事件对象 * * @return 投递成功返回:tb_true,失败返回:tb_false */ tb_bool_t tb_aiop_post(tb_aiop_ref_t aiop,tb_aioe_t const* aioe); /*! 设置和修改aioo对象的等待事件 * * @param aiop aiop池 * @param aioo aioo对象 * @param code 等待事件代码 * @param priv 用户私有数据 * * @return 成功返回:tb_true,失败返回:tb_false */ tb_bool_t tb_aiop_sete(tb_aiop_ref_t aiop,tb_aioo_ref_t aioo,tb_cpointer_t priv); /*! 等待一定量的aioe事件 * * @param aiop aiop池 * @param list aioe事件列表,用于保存成功返回的事件对象 * @param maxn 指定等待多少的事件 * @param timeout 等待超时值,永久等待:-1 * * @return > 0: 实际等到的aioe事件对象数,-1: 失败 */ tb_long_t tb_aiop_wait(tb_aiop_ref_t aiop,tb_aioe_t* list,tb_size_t maxn,tb_long_t timeout);
虽然aiop的reactor模式处理并发io已经相当方便了,但是由于系统底层对其的实现的程度不一,像windows上只能通过select来实现,因此对于一些高性能并发io的处理上,还是有些力不从心的,为了更好的利用不同系统提供的io处理特性,达到更好的并发性,又要保证上层调用接口的简单、统一,实现跨平台、高度可移植性。
这个时候用proactor模式进行更上层的接口封装,完全采用异步回调通知模式,是一种较好地解决方案。
原文链接:/react/308109.html