001 package toolbus.communication;
002
003 import java.io.IOException;
004 import java.nio.channels.SelectableChannel;
005 import java.nio.channels.SelectionKey;
006 import java.nio.channels.Selector;
007 import java.nio.channels.SocketChannel;
008 import java.util.Iterator;
009 import java.util.Set;
010
011 import toolbus.logging.ILogger;
012 import toolbus.logging.IToolBusLoggerConstants;
013 import toolbus.logging.LoggerFactory;
014 import toolbus.util.concurrency.Latch;
015
016 /**
017 * This class handles the multiplexing of socket channels that we are registered for read
018 * operations.
019 *
020 * @author Arnold Lankamp
021 */
022 public class SocketReadMultiplexer implements IReadMultiplexer, Runnable{
023 private final Latch selectionPreventionLatch = new Latch();
024
025 private final AbstractConnectionHandler connectionHandler;
026
027 private final Selector selector;
028 private volatile boolean running = false;
029
030 /**
031 * Constructor.
032 *
033 * @param connectionHandler
034 * The connection handler this read multiplexer was created by.
035 */
036 public SocketReadMultiplexer(AbstractConnectionHandler connectionHandler){
037 super();
038
039 this.connectionHandler = connectionHandler;
040
041 try{
042 selector = Selector.open();
043 }catch(IOException ioex){
044 LoggerFactory.log("Unable to create a selector for the read multiplexer.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
045 throw new RuntimeException(ioex);
046 }
047
048 running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
049 }
050
051 /**
052 * Checks if this multiplexer is running or not.
053 *
054 * @return True if this handler is running, false otherwise.
055 */
056 public boolean isRunning(){
057 return running;
058 }
059
060 /**
061 * Stops the execution of this multiplexer after then next iteration. The running state will be
062 * set to false and the selector will be forced to return directly.
063 */
064 public void stopRunning(){
065 running = false;
066 // Let the selector return immidiately
067 selector.wakeup();
068 }
069
070 /**
071 * The main loop of this multiplexer. It blocks until there is a registered channel which we are
072 * able to read from.
073 */
074 public void run(){
075 while(running){
076 boolean ready = false;
077 while(!ready){
078 try{
079 // This barrier is for preventing this thread from obtaining the monitor we need for
080 // registering a channel. The select() call obtains several monitors and then
081 // blocks. Effectively this causes just about every method of the selector (and
082 // associated objects) to block indefinately when the select() call is in progress!
083 // Thanks Sun for this (incredibly) crappy design / implementation. Long live the
084 // community projects ... yeah NOT!
085 selectionPreventionLatch.await();
086 ready = true;
087 }catch(InterruptedException irex){
088 // Ignore
089 }
090 }
091
092 try{
093 selector.select(); // Wait till we got stuff to do or get woken up.
094 }catch(IOException ioex){
095 LoggerFactory.log("An exception occured during the select call in the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
096 }
097
098 Set<SelectionKey> readyKeys = selector.selectedKeys();
099 Iterator<SelectionKey> iterator = readyKeys.iterator();
100 while(iterator.hasNext()){
101 SelectionKey key = iterator.next();
102 iterator.remove();
103
104 try{
105 if(key.isReadable()){
106 read(key);
107 }
108 }catch(RuntimeException rex){
109 // Catch all RuntimeExceptions. We don't want to risk the termination of this thread.
110 LoggerFactory.log("A runtime exception occured during the execution of the read multiplexer; killing associated connection.", rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
111 connectionHandler.closeDueToException((SocketChannel) key.channel(), (SocketIOHandler) key.attachment());
112 }
113 }
114 }
115
116 // Close the selector once this thread stops running.
117 try{
118 selector.close();
119 }catch(IOException ioex){
120 LoggerFactory.log("Unable to close the selector of the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
121 }
122 }
123
124 /**
125 * Notifies the with the selection key associated I/O handler that we are able to receive data.
126 *
127 * @param key
128 * The key associated with the channel that we can read from.
129 */
130 private void read(SelectionKey key){
131 SelectableChannel channel = key.channel();
132
133 synchronized(channel){
134 SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
135 ioHandler.read();
136 }
137 }
138
139 /**
140 * @see IReadMultiplexer#registerForRead(SelectableChannel, SocketIOHandler)
141 */
142 public void registerForRead(SelectableChannel channel, SocketIOHandler ioHandler){
143 selectionPreventionLatch.acquire();
144
145 try{
146 selector.wakeup();
147
148 synchronized(channel){
149 try{
150 SelectionKey key = channel.keyFor(selector);
151 if(key == null){
152 channel.register(selector, SelectionKey.OP_READ, ioHandler);
153 }else{
154 key.interestOps(SelectionKey.OP_READ);
155 key.attach(ioHandler);
156 }
157 }catch(IOException ioex){
158 LoggerFactory.log("Registering a channel for reading failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
159
160 connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
161 }
162 }
163 }finally{
164 selectionPreventionLatch.release();
165 }
166 }
167
168 /**
169 * @see IReadMultiplexer#deregisterForRead(SelectableChannel)
170 */
171 public void deregisterForRead(SelectableChannel channel){
172 selectionPreventionLatch.acquire();
173
174 try{
175 selector.wakeup();
176
177 synchronized(channel){
178 SelectionKey key = channel.keyFor(selector);
179
180 if(key != null){
181 if(key.isValid()) key.interestOps(0);
182 else key.cancel();
183
184 // Remove the attachment (if any), so it can be GCed
185 key.attach(null);
186 }
187 }
188 }finally{
189 selectionPreventionLatch.release();
190 }
191 }
192 }