JavaNIO非堵塞技术实际是采取反应器模式,或者说是观察者(observer)模式为我们监察I/O端口,如果有内容进来,会自动通知我们,这样,我们就不必开启多个线程死等,从外界看,实现了流畅的I/O读写,不堵塞了。
同步和异步区别:有无通知(是否轮询),异步:处理完后会通知请求方
堵塞和非堵塞区别:操作结果是否等待(是否马上有返回值),只是设计方式的不同,堵塞:需要等待完成后,开始下面的工作
NIO 有一个主要的类Selector,这个类似一个观察者,只要我们把需要探知的socketchannel告诉Selector,我们接着做别的事情,当有事件发生时,他会通知我们,传回一组SelectionKey,我们读取这些Key,就会获得我们刚刚注册过的socketchannel,然后,我们从这个Channel中读取数据,接着我们可以处理这些数据。
反应器模式与观察者模式在某些方面极为相似:当一个主体发生改变时,所有依属体都得到通知。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。
一般模型
我们想象以下情形:长途客车在路途上,有人上车有人下车,但是乘客总是希望能够在客车上得到休息。
传统的做法是:每隔一段时间(或每一个站),司机或售票员对每一个乘客询问是否下车。反应器模式做法是:汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。
理解:
Reactor这个词译成汉语还真没有什么合适的,很多地方叫反应器模式,但更多好像就直接叫reactor模式了,其实我觉着叫应答者模式更好理解一些。通过了解,这个模式更像一个侍卫,一直在等待你的召唤,或者叫召唤兽。
并发系统常使用reactor模式,代替常用的多线程的处理方式,节省系统的资源,提高系统的吞吐量。
先用比较直观的方式来介绍一下这种方式的优点,通过和常用的多线程方式比较一下,可能更好理解。
以一个餐饮为例,每一个人来就餐就是一个事件,他会先看一下菜单,然后点餐。就像一个网站会有很多的请求,要求服务器做一些事情。处理这些就餐事件的就需要我们的服务人员了。
在多线程处理的方式会是这样的:
一个人来就餐,一个服务员去服务,然后客人会看菜单,点菜。 服务员将菜单给后厨。
二个人来就餐,二个服务员去服务……
五个人来就餐,五个服务员去服务……
这个就是多线程的处理方式,一个事件到来,就会有一个线程服务。很显然这种方式在人少的情况下会有很好的用户体验,每个客人都感觉自己是VIP,专人服务的。如果餐厅一直这样同一时间最多来5个客人,这家餐厅是可以很好的服务下去的。
来了一个好消息,因为这家店的服务好,吃饭的人多了起来。同一时间会来10个客人,老板很开心,但是只有5个服务员,这样就不能一对一服务了,有些客人就要没有人管了。老板就又请了5个服务员,现在好了,又能每个人都受VIP待遇了。
越来越多的人对这家餐厅满意,客源又多了,同时来吃饭的人到了20人,老板高兴不起来了,再请服务员吧,占地方不说,还要开工钱,再请人就攒不到钱了。怎么办呢?老板想了想,10个服务员对付20个客人也是能对付过来的,服务员勤快点就好了,伺候完一个客人马上伺候另外一个,还是来得及的。综合考虑了一下,老板决定就使用10个服务人员的线程池啦~~~
但是这样有一个比较严重的缺点就是,如果正在接受服务员服务的客人点菜很慢,其他的客人可能就要等好长时间了。有些火爆脾气的客人可能就等不了走人了。
Reactor如何处理这个问题呢:
老板后来发现,客人点菜比较慢,大部服务员都在等着客人点菜,其实干的活不是太多。老板能当老板当然有点不一样的地方,终于发现了一个新的方法,那就是:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员”,马上就有个服务员过去服务。嘿嘿,然后在老板有了这个新的方法之后,就进行了一次裁员,只留了一个服务员!这就是用单个线程来做多线程的事。
实际的餐馆都是用的Reactor模式在服务。一些设计的模型其实都是从生活中来的。
Reactor模式主要是提高系统的吞吐量,在有限的资源下处理更多的事情。
在单核的机上,多线程并不能提高系统的性能,除非在有一些阻塞的情况发生。否则线程切换的开销会使处理的速度变慢。就像你一个人做两件事情,1、削一个苹果。2、切一个西瓜。那你可以一件一件的做,我想你也会一件一件的做。如果这个时候你使用多线程,一会儿削苹果,一会切西瓜,可以相像究竟是哪个速度快。这也就是说为什么在单核机上多线程来处理可能会更慢。
但当有阻碍操作发生时,多线程的优势才会显示出来,现在你有另外两件事情去做,1、削一个苹果。2、烧一壶开水。我想没有人会去做完一件再做另一件,你肯定会一边烧水,一边就把苹果削了。
package com.linxcool.reactor;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
/**
* 反应器模式
* 用于解决多用户访问并发问题
*
* 举个例子:餐厅服务问题
*
* 传统线程池做法:来一个客人(请求)去一个服务员(线程)
* 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员”
*
* @author linxcool
*/
public class Reactor implements Runnable{
public final Selector selector;
public final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException{
selector=Selector.open();
serverSocketChannel=ServerSocketChannel.open();
InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);
serverSocketChannel.socket().bind(inetSocketAddress);
serverSocketChannel.configureBlocking(false);
//向selector注册该channel
SelectionKey selectionKey=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
//利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor
selectionKey.attach(new Acceptor(this));
}
@Override
public void run() {
try {
while(!Thread.interrupted()){
selector.select();
Set<SelectionKey> selectionKeys= selector.selectedKeys();
Iterator<SelectionKey> it=selectionKeys.iterator();
//Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
while(it.hasNext()){
//来一个事件 第一次触发一个accepter线程
//以后触发SocketReadHandler
SelectionKey selectionKey=it.next();
dispatch(selectionKey);
selectionKeys.clear();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 运行Acceptor或SocketReadHandler
* @param key
*/
void dispatch(SelectionKey key) {
Runnable r = (Runnable)(key.attachment());
if (r != null){
r.run();
}
}
}
package com.linxcool.reactor;
import java.io.IOException;
import java.nio.channels.SocketChannel;
public class Acceptor implements Runnable{
private Reactor reactor;
public Acceptor(Reactor reactor){
this.reactor=reactor;
}
@Override
public void run() {
try {
SocketChannel socketChannel=reactor.serverSocketChannel.accept();
if(socketChannel!=null)//调用Handler来处理channel
new SocketReadHandler(reactor.selector,socketChannel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
package com.linxcool.reactor; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; public class SocketReadHandler implements Runnable{ private SocketChannel socketChannel; public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{ this.socketChannel=socketChannel; socketChannel.configureBlocking(false); SelectionKey selectionKey=socketChannel.register(selector,0); //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。 //参看dispatch(SelectionKey key) selectionKey.attach(this); //同时将SelectionKey标记为可读,以便读取。 selectionKey.interestOps(SelectionKey.OP_READ); selector.wakeup(); } /** * 处理读取数据 */ @Override public void run() { ByteBuffer inputBuffer=ByteBuffer.allocate(1024); inputBuffer.clear(); try { socketChannel.read(inputBuffer); //激活线程池 处理这些request //requestHandle(new Request(socket,btt)); } catch (IOException e) { e.printStackTrace(); } } }