001 package toolbus.communication;
002
003 import java.io.IOException;
004 import java.nio.channels.SelectableChannel;
005 import java.nio.channels.SelectionKey;
006 import java.nio.channels.Selector;
007 import java.nio.channels.SocketChannel;
008 import java.util.Iterator;
009 import java.util.Set;
010
011 import toolbus.logging.ILogger;
012 import toolbus.logging.IToolBusLoggerConstants;
013 import toolbus.logging.LoggerFactory;
014 import toolbus.util.concurrency.Latch;
015
016 /**
017 * This class handles the multiplexing of socket channels that we are registered for either read or
018 * write operations.
019 *
020 * @author Arnold Lankamp
021 */
022 public class SocketReadWriteMultiplexer implements IReadMultiplexer, IWriteMultiplexer, Runnable{
023 private final Latch selectionPreventionLatch = new Latch();
024 private final Object registrationLock = new Object();
025
026 private final AbstractConnectionHandler connectionHandler;
027
028 private final Selector selector;
029 private volatile boolean running = false;
030
031 /**
032 * Constructor.
033 *
034 * @param connectionHandler
035 * The connection handler this multiplexer was created by.
036 */
037 public SocketReadWriteMultiplexer(AbstractConnectionHandler connectionHandler){
038 super();
039
040 this.connectionHandler = connectionHandler;
041
042 try{
043 selector = Selector.open();
044 }catch(IOException ioex){
045 LoggerFactory.log("Unable to create a selector for the read / write multiplexer.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
046 throw new RuntimeException(ioex);
047 }
048
049 running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
050 }
051
052 /**
053 * Checks if this multiplexer is running or not.
054 *
055 * @return True if this multiplexer is running, false otherwise.
056 */
057 public boolean isRunning(){
058 return running;
059 }
060
061 /**
062 * Stops the execution of this multiplexer after then next iteration. The running state will be
063 * set to false and the selector will be forced to return directly.
064 */
065 public void stopRunning(){
066 running = false;
067 }
068
069 /**
070 * The main loop of this multiplexer. It blocks until there is a registered channel which we are
071 * able to read or write from.
072 */
073 public void run(){
074 while(running){
075 boolean ready = false;
076 while(!ready){
077 try{
078 // This barrier is for preventing this thread from obtaining the monitor we need for
079 // registering a channel. The select() call obtains several monitors and then
080 // blocks. Effectively this causes just about every method of the selector (and
081 // associated objects) to block indefinately when the select() call is in progress!
082 // Thanks Sun for this (incredibly) crappy design / implementation. Long live the
083 // community projects ... yeah NOT!
084 selectionPreventionLatch.await();
085 ready = true;
086 }catch(InterruptedException irex){
087 // Ignore
088 }
089 }
090
091 try{
092 selector.select(); // Wait till we got stuff to do or get woken up.
093 }catch(IOException ioex){
094 LoggerFactory.log("An exception occured during the select call in the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
095 }
096
097 Set<SelectionKey> readyKeys = selector.selectedKeys();
098 Iterator<SelectionKey> iterator = readyKeys.iterator();
099 while(iterator.hasNext()){
100 SelectionKey key = iterator.next();
101 iterator.remove();
102
103 try{
104 if(key.isReadable()){
105 read(key);
106 }
107 if(key.isValid() && key.isWritable()){
108 write(key);
109 }
110 }catch(RuntimeException rex){
111 // Catch all RuntimeExceptions. We don't want to risk the termination of this thread.
112 LoggerFactory.log("A runtime exception occured during the execution of the read multiplexer; killing associated connection.", rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
113 connectionHandler.closeDueToException((SocketChannel) key.channel(), (SocketIOHandler) key.attachment());
114 }
115 }
116 }
117
118 // Close the selector once this thread stops running.
119 try{
120 selector.close();
121 }catch(IOException ioex){
122 LoggerFactory.log("Unable to close the selector of the read / write multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
123 }
124 }
125
126 /**
127 * Notifies the with the selection key associated I/O handler that we are able to receive data.
128 *
129 * @param key
130 * The key associated with the channel that we can read from.
131 */
132 private void read(SelectionKey key){
133 SelectableChannel channel = key.channel();
134
135 synchronized(channel){
136 SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
137 ioHandler.read();
138 }
139 }
140
141 /**
142 * Notifies the with the selection key associated I/O handler that we are able to send data.
143 *
144 * @param key
145 * The key associated with the channel that we can write to.
146 */
147 private void write(SelectionKey key){
148 SelectableChannel channel = key.channel();
149
150 synchronized(registrationLock){
151 SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
152
153 ioHandler.write();
154
155 // If there is not more data that needs to be written or the channel has been closed in the
156 // mean time, de-register it.
157 if(!ioHandler.hasMoreToWrite() || !channel.isOpen()) deregisterForWrite(channel);
158 }
159 }
160
161 /**
162 * @see IReadMultiplexer#registerForRead(SelectableChannel, SocketIOHandler)
163 */
164 public void registerForRead(SelectableChannel channel, SocketIOHandler ioHandler){
165 selectionPreventionLatch.acquire();
166
167 try{
168 selector.wakeup();
169
170 synchronized(registrationLock){
171 try{
172 SelectionKey key = channel.keyFor(selector);
173 int interestOps = SelectionKey.OP_READ;
174 if(key != null) interestOps |= key.interestOps();
175 channel.register(selector, interestOps, ioHandler);
176 }catch(IOException ioex){
177 LoggerFactory.log("Registering a channel for reading failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
178
179 connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
180 }
181 }
182 }finally{
183 selectionPreventionLatch.release();
184 }
185 }
186
187 /**
188 * @see IReadMultiplexer#deregisterForRead(SelectableChannel)
189 */
190 public void deregisterForRead(SelectableChannel channel){
191 selectionPreventionLatch.acquire();
192
193 try{
194 selector.wakeup();
195
196 synchronized(registrationLock){
197 SelectionKey key = channel.keyFor(selector);
198 if(key != null){
199 if(key.isValid()){
200 key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
201
202 // Remove the attachment (if any), so it can be GCed
203 if(key.interestOps() == 0) key.attach(null);
204 }else{
205 key.cancel();
206 key.attach(null);
207 }
208 }
209 }
210 }finally{
211 selectionPreventionLatch.release();
212 }
213 }
214
215 /**
216 * @see IWriteMultiplexer#registerForWrite(SelectableChannel, SocketIOHandler)
217 */
218 public void registerForWrite(SelectableChannel channel, SocketIOHandler ioHandler){
219 selectionPreventionLatch.acquire();
220
221 try{
222 selector.wakeup();
223
224 synchronized(registrationLock){
225 try{
226 SelectionKey key = channel.keyFor(selector);
227 int interestOps = SelectionKey.OP_WRITE;
228 if(key != null) interestOps |= key.interestOps();
229 channel.register(selector, interestOps, ioHandler);
230 }catch(IOException ioex){
231 LoggerFactory.log("Registering a channel for writing failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
232
233 connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
234 }
235 }
236 }finally{
237 selectionPreventionLatch.release();
238 }
239 }
240
241 /**
242 * @see IWriteMultiplexer#deregisterForWrite(SelectableChannel)
243 */
244 public void deregisterForWrite(SelectableChannel channel){
245 selectionPreventionLatch.acquire();
246
247 try{
248 selector.wakeup();
249
250 synchronized(registrationLock){
251 SelectionKey key = channel.keyFor(selector);
252 if(key != null){
253 if(key.isValid()){
254 key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
255
256 // Remove the attachment (if any), so it can be GCed
257 if(key.interestOps() == 0) key.attach(null);
258 }else{
259 key.cancel();
260 key.attach(null);
261 }
262 }
263 }
264 }finally{
265 selectionPreventionLatch.release();
266 }
267 }
268 }