001 package toolbus.communication;
002
003 import java.io.IOException;
004 import java.nio.channels.SelectableChannel;
005 import java.nio.channels.SelectionKey;
006 import java.nio.channels.Selector;
007 import java.nio.channels.SocketChannel;
008 import java.util.Iterator;
009 import java.util.Set;
010
011 import toolbus.logging.ILogger;
012 import toolbus.logging.IToolBusLoggerConstants;
013 import toolbus.logging.LoggerFactory;
014 import toolbus.util.concurrency.Latch;
015
016 /**
017 * This class handles the multiplexing of socket channels that we are registered for write
018 * operations.
019 *
020 * @author Arnold Lankamp
021 */
022 public class SocketWriteMultiplexer implements IWriteMultiplexer, Runnable{
023 private final Latch selectionPreventionLatch = new Latch();
024
025 private final AbstractConnectionHandler connectionHandler;
026 private final Selector selector;
027 private volatile boolean running = false;
028
029 /**
030 * Constructor.
031 *
032 * @param connectionHandler
033 * The connection handler this write multiplexer was created by.
034 */
035 public SocketWriteMultiplexer(AbstractConnectionHandler connectionHandler){
036 super();
037
038 this.connectionHandler = connectionHandler;
039
040 try{
041 selector = Selector.open();
042 }catch(IOException ioex){
043 LoggerFactory.log("Unable to create a selector for the write multiplexer.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
044 throw new RuntimeException(ioex);
045 }
046
047 running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
048 }
049
050 /**
051 * Checks if this multiplexer is running or not.
052 *
053 * @return True if this handler is running, false otherwise.
054 */
055 public boolean isRunning(){
056 return running;
057 }
058
059 /**
060 * Stops the execution of this multiplexer after then next iteration. The running state will be
061 * set to false and the selector will be forced to return directly.
062 */
063 public void stopRunning(){
064 running = false;
065 // Let the selector return immidiately
066 selector.wakeup();
067 }
068
069 /**
070 * The main loop of this multiplexer. It blocks until there is a registered channel which we are
071 * able to write to.
072 */
073 public void run(){
074 while(running){
075 boolean ready = false;
076 while(!ready){
077 try{
078 // This barrier is for preventing this thread from obtaining the monitor we need for
079 // registering a channel. The select() call obtains several monitors and then
080 // blocks. Effectively this causes just about every method of the selector (and
081 // associated objects) to block indefinately when the select() call is in progress!
082 // Thanks Sun for this (incredibly) crappy design / implementation. Long live the
083 // community projects ... yeah NOT!
084 selectionPreventionLatch.await();
085 ready = true;
086 }catch(InterruptedException irex){
087 // Ignore
088 }
089 }
090
091 try{
092 selector.select(); // Wait till we got stuff to do or get woken up.
093 }catch(IOException ioex){
094 LoggerFactory.log("An exception occured during the select call in the write multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
095 }
096
097 Set<SelectionKey> readyKeys = selector.selectedKeys();
098 Iterator<SelectionKey> iterator = readyKeys.iterator();
099 while(iterator.hasNext()){
100 SelectionKey key = iterator.next();
101 iterator.remove();
102
103 if(key.isWritable()){
104 try{
105 write(key);
106 }catch(RuntimeException rex){
107 // Catch all RuntimeExceptions. We don't want to risk the termination of this thread.
108 LoggerFactory.log("A runtime exception occured during the execution of the write multiplexer; killing associated connection.", rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
109 connectionHandler.closeDueToException((SocketChannel) key.channel(), (SocketIOHandler) key.attachment());
110 }
111 }
112 }
113 }
114
115 // Close the selector once this thread stops running.
116 try{
117 selector.close();
118 }catch(IOException ioex){
119 LoggerFactory.log("Unable to close the selector of the write multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
120 }
121 }
122
123 /**
124 * Notifies the with the selection key associated I/O handler that we are able to send data.
125 *
126 * @param key
127 * The key associated with the channel that we can write to.
128 */
129 private void write(SelectionKey key){
130 SelectableChannel channel = key.channel();
131
132 synchronized(channel){
133 SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
134
135 ioHandler.write();
136
137 // If there is not more data that needs to be written or the channel has been closed in the
138 // mean time, de-register it.
139 if(!ioHandler.hasMoreToWrite() || !channel.isOpen()) deregisterForWrite(channel);
140 }
141 }
142
143 /**
144 * @see IWriteMultiplexer#registerForWrite(SelectableChannel, SocketIOHandler)
145 */
146 public void registerForWrite(SelectableChannel channel, SocketIOHandler ioHandler){
147 selectionPreventionLatch.acquire();
148
149 try{
150 selector.wakeup();
151
152 synchronized(channel){
153 try{
154 SelectionKey key = channel.keyFor(selector);
155 if(key == null){
156 channel.register(selector, SelectionKey.OP_WRITE, ioHandler);
157 }else{
158 key.interestOps(SelectionKey.OP_WRITE);
159 key.attach(ioHandler);
160 }
161 }catch(IOException ioex){
162 LoggerFactory.log("Registering a channel for writing failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
163
164 connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
165 }
166 }
167 }finally{
168 selectionPreventionLatch.release();
169 }
170 }
171
172 /**
173 * @see IWriteMultiplexer#deregisterForWrite(SelectableChannel)
174 */
175 public void deregisterForWrite(SelectableChannel channel){
176 selectionPreventionLatch.acquire();
177
178 try{
179 selector.wakeup();
180
181 synchronized(channel){
182 SelectionKey key = channel.keyFor(selector);
183 if(key != null){
184 if(key.isValid()) key.interestOps(0);
185 else key.cancel();
186
187 // Remove the attachment (if any), so it can be GCed
188 key.attach(null);
189 }
190 }
191 }finally{
192 selectionPreventionLatch.release();
193 }
194 }
195 }