Reactor
在讲解Mina之前,我们先了解下Reactor模式,从一个简单例子入手,这样做的原因是,通过简单例子,我们就能快准狠地了解Mina的内部原理。
上一遍讲了Reacto相关的概念。其实主要有个三个概念:
1. 事件
2. 事件分发器
3. 事件处理器
事件主要是指各种IO事件,在通讯这一层主要有 accept,read,write 三个IO事件;
事件分发器主要工作是接收事件并派发给事件处理器来处理,起调度中心的作用;
事件处理器主要负责业务的逻辑处理,是我们最关注的部分;
服务端
下面的代码是服务端代码示例,主线程负责accept事件和事件派发工作,收到一个client连接后,会生成一个SocketReadHanler 任务,同时绑定read事件到这个SocketReadHanler上。
public class Reactor implements Runnable {
//selector
private final Selector selector;
//socket
private final ServerSocketChannel serverSocketChannel;
//线程池
private final ExecutorService service = Executors
.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 10);
/** * @throws IOException * */
public Reactor(int port) throws IOException {
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress("localhost",port);
serverSocketChannel.socket().bind(address);
serverSocketChannel.configureBlocking(false);
//register the channel
SelectionKey selectionKey = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
System.out.println("--> start serverSocket.register");
//attache the acceptor
selectionKey.attach(new Acceptor());
System.out.println("-->attach new Acceptor");
}
/** * 事件到到后触发handler执行 */
class Acceptor implements Runnable {
/** * @see java.lang.Runnable#run() */
@Override
public void run() {
try {
System.out.println("-->ready for accept");
SocketChannel channel = serverSocketChannel.accept();
if (channel != null) {
new SocketReadHanler(selector,channel);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/** * @see java.lang.Runnable#run() */
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
//it will execute if read event or accept event occurs
while (it.hasNext()) {
//when one event comes,it will trigger the accept thread in the first time and trigger the SockerReadHandler after the first time
SelectionKey selectionKey = it.next();
dispatch(selectionKey);
}
keys.clear();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/** * event dispatcher,* 不同的handler绑定不同IO事件, * 这样事件到达后直接触发handler执行 * * @param key */
void dispatch(SelectionKey key) {
Runnable runnable = (Runnable) key.attachment();
//放到线程池中,执行某个job
Thread thread = new Thread(runnable,"thread_" + System.currentTimeMillis());
service.execute(thread);
}
}
SocketReadHanler 负责业务逻辑处理,它会绑定read监听事件到selector上。
public class SocketReadHanler implements Runnable {
private final SocketChannel channel;
private final SelectionKey key;
private static final int READING = 0;
int state = READING;
/** * 绑定IO事件到具体handler * @param sel * @param channel * @throws IOException */
public SocketReadHanler(Selector sel,SocketChannel channel) throws IOException {
this.channel = channel;
this.channel.configureBlocking(false);
key = this.channel.register(sel,0);
//将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
//binding
key.attach(this);
//read
key.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
/** * @see java.lang.Runnable#run() */
@Override
public void run() {
try {
ByteBuffer clientBuffer = ByteBuffer.allocate(4096);
if (key.isReadable()) { // 读信息
System.out.println("read:" + Thread.currentThread().getName());
int count = channel.read(clientBuffer); // 将数据读入clientBuffer
if (count > 0) { // 当有数据读入时
System.out.println("wait 30s");
Thread.sleep(30000);
clientBuffer.flip(); // 反转此缓冲区
// 如果需要,对缓冲区中的字符进行解码
//处理数据
System.out.println(getChars(clientBuffer.array()));
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
private char[] getChars(byte[] bytes) {//将字节转为字符(解码)
Charset cs = Charset.forName("UTF-8");
ByteBuffer bb = ByteBuffer.allocate(bytes.length);
bb.put(bytes);
bb.flip();
CharBuffer cb = cs.decode(bb);
return cb.array();
}
}
main函数:
public class ReactorTest {
public static void main(String[] args) {
try {
Reactor reactor = new Reactor(9527);
reactor.run();
} catch (IOException e) {
e.printStackTrace();
}
}
}
从上面可以看出,各个io事件都绑定到一个selector上,不同的io事件到达后触发不同的handler处理。handler线程放到一个线程池中,保证线程的数量不会太大。