1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
class Server implements Runnable {
public void run()
{
try
{
ServerSocket ss = new ServerSocket(PORT);
while
(!Thread.interrupted())
Thread(
Handler(ss.accept())).start();
// or,single-threaded,or a thread pool
}
catch (IOException ex) { }
}
}
Handler
Runnable {
final
Socket socket;
Handler(Socket s) { socket = s; }
run() {
{
byte
[] input = new byte [MAX_INPUT];
socket.getInputStream().read(input);
[] output = process(input);
socket.getOutputStream().write(output);
(IOException ex) { }
}
private
[] process( [] cmd) { }
}
|
Reactor Pattern is an event handling design pattern used to address this issue. Here,one Reactor will keep looking for events and will inform the corresponding event handler to handle it once the event gets triggered. To explain this I am using some Java code borrowed from some lecture slides byProfessor Doug Lea. To see his explanation please go throughthisset of slides.
Java provides a standard API (java.nio) which could be used to design non-blocking IO systems. I will explain the Reactor pattern with a simple client server model where the clients will shout out their names to the server while the server will respond to the corresponding client with a Hello message.
There are two important participants in the architecture of Reactor Pattern.
1. Reactor
AReactorruns in a separate thread and its job is to react to IO events by dispatching the work to the appropriate handler. Its like a telephone operator in a company who answers the calls from clients and transfers the communication line to the appropriate receiver. Don't go too far with the analogy though :).
2. Handlers Handlerperforms the actual work to be done with an IO event similar to the actual officer in the company the client who called wants to speak to.
Since we are using java.nio package,its important to understand some of the classes used to implement the system. I will simply repeat some of the explanations by Doug Lea in his lecture sides to make the readers lives easy :).
Channels These are connections to files,sockets etc. that support non blocking reads. Just like many TV channels can be watched from one physical connection to the antena,manyjava.nio.channels.SocketChannels corresponding to each client can be made from a singlejava.nio.channels.ServerSocketChannelwhich is bound to a single port.
Buffers Array-like objects that can be directly read or written to by Channels.
Selectors Selectors tell which of a set of Channels has IO events.
Selection Keys Selection Keys maintain IO event status and bindings. Its a representation of the relationship between a Selector and a Channel. By looking at the Selection Key given by the Selector,the Reactor can decide what to do with the IO event which occurs on the Channel.
Now lets try to understand what Reactor Pattern is. Take a look at this diagram.
Here,there is a singleServerSocketChannelwhich is registered with aSelector. TheSelectionKey 0for this registration has information on what to do with theif it gets an event. ObvIoUsly the ServerSocketChannel should receive events from incoming connection requests from clients. When a client requests for a connection and wants to have a dedicated SocketChannel,the ServerSocketChannel should get triggered with an IO event. What does thehave to do with this event? It simply has to Accept it to make a SocketChannel. Thereforewill be bound to anAcceptorwhich is a special handler made to accept connections so that the Reactor can figure out that the event should be dispatched to the Acceptor by looking at SelectionKey 0. Notice that,andare all in same colour ( Gray I suppose :) )
Theis made to keep looking for IO events. When the Reactor callsSelector.select()method,the Selector will provide a set ofSelectionKeysfor the channels which have pending events. Whenis selected,it means that an event has occurred on ServerSocketChannel. So the Reactor will dispatch the event to the.
When the Acceptor accepts the connection fromClient 1SocketChannel1for the client. This SocketChannel will be registered with the same Selector withSelectionKey 1. What would the client do with this SocketChannel? It will simply read from and write to the server. The server does not need to accept connections from client 1 any more since it already accepted the connection. Now what the server needs is to Read and Write data to the channel. So SelectionKey 1 will be bound toHandler 1object which handles reading and writing. Notice thatSocketChannel 1are all in Green.
The next time the Reactor callesSelectionKey Sethasin it,it means thatis triggered with an event. Now by looking at SelectionKey 1,the Reactor knows that it has to dispatch the event tosince Hander 1 is bound to SelectionKey 1. If the returned SelectionKey Set hashas received an event from another client and by looking at the SelectionKey 0 the Reactor knows that it has to dispatch the event to theagain. When the event is dispatched to the Acceptor it will makeSocketChannel 2forclient 2and register the socket channel with the Selector withSelectionKey 2So in this scenario we are interested in 3 types of events.
- Connection request eventswhich get triggered on theServerSocketChannelwhich we need toAccept.
- Read eventswhich get triggerd onSocketChannels when they have data to be read,from which we need toRead.
- Write eventswhich get triggered onSocketChannels when they are ready to be written with data,to which we need toWrite.
Selection Key | Channel | Handler | Interested Operation |
---|---|---|---|
SelectionKey 0 | ServerSocketChannel | Acceptor | Accept |
SelectionKey 1 | SocketChannel 1 | Handler 1 | Read and Write |
SelectionKey 2 | SocketChannel 2 | Handler 2 | Read and Write |
SelectionKey 3 | SocketChannel 3 | Handler 3 | Read and Write |
We can add concurrency to our design to make the system more responsive and faster. When the Reactor dispatches the event to a Handler,it can start the Handler in a new Thread so that the Reactor can happily continue to deal with other events. This will always be a better design when performance is concerned. To limit the number of Threads in the system and to make things more organized,a Thread pool can be used.
I believe this explanation is adequate for us to get our hands dirty with some coding.
In this blog post I will explain the implementation of Reactor Pattern with a simple Client - Server system where the server will send Hello messages to each client when their names are told to the server. The server will listen to port9900and multiple clients will connect to the server to shout out their names. A thread pool will not be used here. First lets run the server in a single thread. Part 3 of this series will explain how a Thread pool is used.
First lets make the Client to connect to port
public
class
Client {
String hostIp;
int
hostPort;
public
Client(String hostIp,
hostPort) {
this
.hostIp = hostIp;
.hostPort = hostPort;
}
void
runClient()
throws
IOException {
Socket clientSocket =
null
;
PrintWriter out =
;
BufferedReader in =
;
try
{
clientSocket =
new
Socket(hostIp,hostPort);
out =
PrintWriter(clientSocket.getOutputStream(),monospace!important; font-size:1em!important; min-height:auto!important">true
);
in =
BufferedReader(
InputStreamReader(clientSocket.getInputStream()));
}
catch
(UnknownHostException e) {
System.err.println(
"Unknown host: "
+ hostIp);
System.exit(
1
);
(IOException e) {
"Couldn't connect to: "
+ hostIp);
);
}
BufferedReader stdIn =
InputStreamReader(System.in));
String userInput;
System.out.println(
"Client connected to host : "
+ hostIp +
" port: "
+ hostPort);
"Type (\"Bye\" to quit)"
);
"Tell what your name is to the Server....."
);
while
((userInput = stdIn.readLine()) !=
) {
out.println(userInput);
// Break when client says Bye.
if
(userInput.equalsIgnoreCase(
"Bye"
))
break
;
"Server says: "
+ in.readLine());
}
out.close();
in.close();
stdIn.close();
clientSocket.close();
}
static
main(String[] args)
IOException {
Client client =
Client(
"127.0.0.1"
,
9900
);
client.runClient();
}
}
|
Notice that the client doesn't use java.nio to create the Socket. It simply uses a java.net.Socket everybody knows about.
Now lets make the Reactor in the Server.
Reactor
implements
Runnable {
final
Selector selector;
ServerSocketChannel serverSocketChannel;
final
boolean
isWithThreadPool;
Reactor(
port,monospace!important; font-size:1em!important; min-height:auto!important">boolean
isWithThreadPool)
IOException {
.isWithThreadPool = isWithThreadPool;
selector = Selector.open();
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(
InetSocketAddress(port));
serverSocketChannel.configureBlocking(
false
);
SelectionKey selectionKey0 = serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
selectionKey0.attach(
Acceptor());
}
run() {
"Server listening to port: "
+ serverSocketChannel.socket().getLocalPort());
{
(!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
(it.hasNext()) {
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
(IOException ex) {
ex.printStackTrace();
}
}
void
dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
(r !=
) {
r.run();
}
}
class
Acceptor
Runnable {
run() {
{
SocketChannel socketChannel = serverSocketChannel.accept();
(socketChannel !=
) {
(isWithThreadPool)
HandlerWithThreadPool(selector,socketChannel);
else
Handler(selector,socketChannel);
}
"Connection Accepted by Reactor"
(IOException ex) {
ex.printStackTrace();
}
}
}
The Reactor is a
Runnable
. See the while loop in the
run()
method. It will call
selector.select()
to get the
SelectionKeys
which have pending IO events. When the SelectionKeys are selected,they will be dispatched one by one. See the
dispatch()
method. The SelectionKey will have an
attatchment
which is also a Runnable. This attatchement will either be an
Acceptor
or a
Handler
.
|
Handler
SocketChannel socketChannel;
SelectionKey selectionKey;
ByteBuffer input = ByteBuffer.allocate(
1024
);
static
final
int
READING =
0
;
state = READING;
String clientName =
""
;
Handler(Selector selector,SocketChannel c)
socketChannel = c;
c.configureBlocking(
);
selectionKey = socketChannel.register(selector,monospace!important; font-size:1em!important; min-height:auto!important">selectionKey.attach(
);
selectionKey.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
run() {
(state == READING) {
read();
else
if
(state == SENDING) {
send();
}
(IOException ex) {
ex.printStackTrace();
}
}
read()
IOException {
readCount = socketChannel.read(input);
(readCount >
) {
readProcess(readCount);
}
state = SENDING;
// Interested in writing
selectionKey.interestOps(SelectionKey.OP_WRITE);
}
/**
* Processing of the read message. This only prints the message to stdOut.
*
* @param readCount
*/
synchronized
readProcess(
readCount) {
StringBuilder sb =
StringBuilder();
input.flip();
byte
[] subStringBytes =
new
byte
[readCount];
[] array = input.array();
System.arraycopy(array,subStringBytes,readCount);
// Assuming ASCII (bad assumption but simplifies the example)
sb.append(
String(subStringBytes));
input.clear();
clientName = sb.toString().trim();
}
send()
IOException {
"Saying hello to "
+ clientName);
ByteBuffer output = ByteBuffer.wrap((
"Hello "
+ clientName +
"\n"
).getBytes());
socketChannel.write(output);
selectionKey.interestOps(SelectionKey.OP_READ);
state = READING;
}
A Handler has 2 states,
READING
and
SENDING
. Both cant be handled at the same time because a Channel supports only one operation at one time. Since its the client who speaks first,a server Handler will start with the READING state. Notice how this Handler is attatched to the SelectionKey and how the
Interested Operation
is set to
OP_READ
. This means that the Selector should only select this SelectionKey when a
Read Event
occurs. Once the read process is done,the Handler will change its state to
and will change the
to
OP_WRITE
. Now the Selector will select this SelectionKey only when it gets a
Write Event
from the Channel when its ready to be written with data. When a
is dispatched to this Handler,it will write the Hello message
to the output buffer since now the state is
. Once sending is done,it will change back to
state with
changed to
again. It should be obvIoUs that since both
are
Runnables
method of the
Reactor
can execute the
method of any attatchment it gets from a selected SelectionKey.
|
IOException{
Reactor reactor =
Reactor(
);
Thread(reactor).start();
To see how this works first run the server. Then run several clients and see how they get connected to the server. When each client writes a name to standard in of the client,the sever will respond to the client with a Hello message. Notice that the server runs in a single Thread but responds to any number of clients which connect to the server.
|