001 package toolbus.communication; 002 003 import java.io.IOException; 004 import java.nio.channels.SelectableChannel; 005 import java.nio.channels.SelectionKey; 006 import java.nio.channels.Selector; 007 import java.nio.channels.SocketChannel; 008 import java.util.Iterator; 009 import java.util.Set; 010 011 import toolbus.logging.ILogger; 012 import toolbus.logging.IToolBusLoggerConstants; 013 import toolbus.logging.LoggerFactory; 014 import toolbus.util.concurrency.Latch; 015 016 /** 017 * This class handles the multiplexing of socket channels that we are registered for read 018 * operations. 019 * 020 * @author Arnold Lankamp 021 */ 022 public class SocketReadMultiplexer implements IReadMultiplexer, Runnable{ 023 private final Latch selectionPreventionLatch = new Latch(); 024 025 private final AbstractConnectionHandler connectionHandler; 026 027 private final Selector selector; 028 private volatile boolean running = false; 029 030 /** 031 * Constructor. 032 * 033 * @param connectionHandler 034 * The connection handler this read multiplexer was created by. 035 */ 036 public SocketReadMultiplexer(AbstractConnectionHandler connectionHandler){ 037 super(); 038 039 this.connectionHandler = connectionHandler; 040 041 try{ 042 selector = Selector.open(); 043 }catch(IOException ioex){ 044 LoggerFactory.log("Unable to create a selector for the read multiplexer.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION); 045 throw new RuntimeException(ioex); 046 } 047 048 running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem. 049 } 050 051 /** 052 * Checks if this multiplexer is running or not. 053 * 054 * @return True if this handler is running, false otherwise. 055 */ 056 public boolean isRunning(){ 057 return running; 058 } 059 060 /** 061 * Stops the execution of this multiplexer after then next iteration. The running state will be 062 * set to false and the selector will be forced to return directly. 063 */ 064 public void stopRunning(){ 065 running = false; 066 // Let the selector return immidiately 067 selector.wakeup(); 068 } 069 070 /** 071 * The main loop of this multiplexer. It blocks until there is a registered channel which we are 072 * able to read from. 073 */ 074 public void run(){ 075 while(running){ 076 boolean ready = false; 077 while(!ready){ 078 try{ 079 // This barrier is for preventing this thread from obtaining the monitor we need for 080 // registering a channel. The select() call obtains several monitors and then 081 // blocks. Effectively this causes just about every method of the selector (and 082 // associated objects) to block indefinately when the select() call is in progress! 083 // Thanks Sun for this (incredibly) crappy design / implementation. Long live the 084 // community projects ... yeah NOT! 085 selectionPreventionLatch.await(); 086 ready = true; 087 }catch(InterruptedException irex){ 088 // Ignore 089 } 090 } 091 092 try{ 093 selector.select(); // Wait till we got stuff to do or get woken up. 094 }catch(IOException ioex){ 095 LoggerFactory.log("An exception occured during the select call in the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 096 } 097 098 Set<SelectionKey> readyKeys = selector.selectedKeys(); 099 Iterator<SelectionKey> iterator = readyKeys.iterator(); 100 while(iterator.hasNext()){ 101 SelectionKey key = iterator.next(); 102 iterator.remove(); 103 104 try{ 105 if(key.isReadable()){ 106 read(key); 107 } 108 }catch(RuntimeException rex){ 109 // Catch all RuntimeExceptions. We don't want to risk the termination of this thread. 110 LoggerFactory.log("A runtime exception occured during the execution of the read multiplexer; killing associated connection.", rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 111 connectionHandler.closeDueToException((SocketChannel) key.channel(), (SocketIOHandler) key.attachment()); 112 } 113 } 114 } 115 116 // Close the selector once this thread stops running. 117 try{ 118 selector.close(); 119 }catch(IOException ioex){ 120 LoggerFactory.log("Unable to close the selector of the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 121 } 122 } 123 124 /** 125 * Notifies the with the selection key associated I/O handler that we are able to receive data. 126 * 127 * @param key 128 * The key associated with the channel that we can read from. 129 */ 130 private void read(SelectionKey key){ 131 SelectableChannel channel = key.channel(); 132 133 synchronized(channel){ 134 SocketIOHandler ioHandler = (SocketIOHandler) key.attachment(); 135 ioHandler.read(); 136 } 137 } 138 139 /** 140 * @see IReadMultiplexer#registerForRead(SelectableChannel, SocketIOHandler) 141 */ 142 public void registerForRead(SelectableChannel channel, SocketIOHandler ioHandler){ 143 selectionPreventionLatch.acquire(); 144 145 try{ 146 selector.wakeup(); 147 148 synchronized(channel){ 149 try{ 150 SelectionKey key = channel.keyFor(selector); 151 if(key == null){ 152 channel.register(selector, SelectionKey.OP_READ, ioHandler); 153 }else{ 154 key.interestOps(SelectionKey.OP_READ); 155 key.attach(ioHandler); 156 } 157 }catch(IOException ioex){ 158 LoggerFactory.log("Registering a channel for reading failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 159 160 connectionHandler.closeDueToException((SocketChannel) channel, ioHandler); 161 } 162 } 163 }finally{ 164 selectionPreventionLatch.release(); 165 } 166 } 167 168 /** 169 * @see IReadMultiplexer#deregisterForRead(SelectableChannel) 170 */ 171 public void deregisterForRead(SelectableChannel channel){ 172 selectionPreventionLatch.acquire(); 173 174 try{ 175 selector.wakeup(); 176 177 synchronized(channel){ 178 SelectionKey key = channel.keyFor(selector); 179 180 if(key != null){ 181 if(key.isValid()) key.interestOps(0); 182 else key.cancel(); 183 184 // Remove the attachment (if any), so it can be GCed 185 key.attach(null); 186 } 187 } 188 }finally{ 189 selectionPreventionLatch.release(); 190 } 191 } 192 }