001    package toolbus.communication;
002    
003    import java.io.IOException;
004    import java.nio.channels.SelectableChannel;
005    import java.nio.channels.SelectionKey;
006    import java.nio.channels.Selector;
007    import java.nio.channels.SocketChannel;
008    import java.util.Iterator;
009    import java.util.Set;
010    
011    import toolbus.logging.ILogger;
012    import toolbus.logging.IToolBusLoggerConstants;
013    import toolbus.logging.LoggerFactory;
014    import toolbus.util.concurrency.Latch;
015    
016    /**
017     * This class handles the multiplexing of socket channels that we are registered for either read or
018     * write operations.
019     * 
020     * @author Arnold Lankamp
021     */
022    public class SocketReadWriteMultiplexer implements IReadMultiplexer, IWriteMultiplexer, Runnable{
023            private final Latch selectionPreventionLatch = new Latch();
024            private final Object registrationLock = new Object();
025            
026            private final AbstractConnectionHandler connectionHandler;
027            
028            private final Selector selector;
029            private volatile boolean running = false;
030    
031            /**
032             * Constructor.
033             * 
034             * @param connectionHandler
035             *            The connection handler this multiplexer was created by.
036             */
037            public SocketReadWriteMultiplexer(AbstractConnectionHandler connectionHandler){
038                    super();
039    
040                    this.connectionHandler = connectionHandler;
041    
042                    try{
043                            selector = Selector.open();
044                    }catch(IOException ioex){
045                            LoggerFactory.log("Unable to create a selector for the read / write multiplexer.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
046                            throw new RuntimeException(ioex);
047                    }
048                    
049                    running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
050            }
051    
052            /**
053             * Checks if this multiplexer is running or not.
054             * 
055             * @return True if this multiplexer is running, false otherwise.
056             */
057            public boolean isRunning(){
058                    return running;
059            }
060    
061            /**
062             * Stops the execution of this multiplexer after then next iteration. The running state will be
063             * set to false and the selector will be forced to return directly.
064             */
065            public void stopRunning(){
066                    running = false;
067            }
068    
069            /**
070             * The main loop of this multiplexer. It blocks until there is a registered channel which we are
071             * able to read or write from.
072             */
073            public void run(){
074                    while(running){
075                            boolean ready = false;
076                            while(!ready){
077                                    try{
078                                            // This barrier is for preventing this thread from obtaining the monitor we need for
079                                            // registering a channel. The select() call obtains several monitors and then
080                                            // blocks. Effectively this causes just about every method of the selector (and
081                                            // associated objects) to block indefinately when the select() call is in progress!
082                                            // Thanks Sun for this (incredibly) crappy design / implementation. Long live the
083                                            // community projects ... yeah NOT!
084                                            selectionPreventionLatch.await();
085                                            ready = true;
086                                    }catch(InterruptedException irex){
087                                            // Ignore
088                                    }
089                            }
090                            
091                            try{
092                                    selector.select(); // Wait till we got stuff to do or get woken up.
093                            }catch(IOException ioex){
094                                    LoggerFactory.log("An exception occured during the select call in the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
095                            }
096    
097                            Set<SelectionKey> readyKeys = selector.selectedKeys();
098                            Iterator<SelectionKey> iterator = readyKeys.iterator();
099                            while(iterator.hasNext()){
100                                    SelectionKey key = iterator.next();
101                                    iterator.remove();
102    
103                                    try{
104                                            if(key.isReadable()){
105                                                    read(key);
106                                            }
107                                            if(key.isValid() && key.isWritable()){
108                                                    write(key);
109                                            }
110                                    }catch(RuntimeException rex){
111                                            // Catch all RuntimeExceptions. We don't want to risk the termination of this thread.
112                                            LoggerFactory.log("A runtime exception occured during the execution of the read multiplexer; killing associated connection.", rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
113                                            connectionHandler.closeDueToException((SocketChannel) key.channel(), (SocketIOHandler) key.attachment());
114                                    }
115                            }
116                    }
117    
118                    // Close the selector once this thread stops running.
119                    try{
120                            selector.close();
121                    }catch(IOException ioex){
122                            LoggerFactory.log("Unable to close the selector of the read / write multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
123                    }
124            }
125    
126            /**
127             * Notifies the with the selection key associated I/O handler that we are able to receive data.
128             * 
129             * @param key
130             *            The key associated with the channel that we can read from.
131             */
132            private void read(SelectionKey key){
133                    SelectableChannel channel = key.channel();
134                    
135                    synchronized(channel){
136                            SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
137                            ioHandler.read();
138                    }
139            }
140    
141            /**
142             * Notifies the with the selection key associated I/O handler that we are able to send data.
143             * 
144             * @param key
145             *            The key associated with the channel that we can write to.
146             */
147            private void write(SelectionKey key){
148                    SelectableChannel channel = key.channel();
149                    
150                    synchronized(registrationLock){
151                            SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
152            
153                            ioHandler.write();
154            
155                            // If there is not more data that needs to be written or the channel has been closed in the
156                            // mean time, de-register it.
157                            if(!ioHandler.hasMoreToWrite() || !channel.isOpen()) deregisterForWrite(channel);
158                    }
159            }
160    
161            /**
162             * @see IReadMultiplexer#registerForRead(SelectableChannel, SocketIOHandler)
163             */
164            public void registerForRead(SelectableChannel channel, SocketIOHandler ioHandler){
165                    selectionPreventionLatch.acquire();
166                    
167                    try{
168                            selector.wakeup();
169    
170                            synchronized(registrationLock){
171                                    try{
172                                            SelectionKey key = channel.keyFor(selector);
173                                            int interestOps = SelectionKey.OP_READ;
174                                            if(key != null) interestOps |= key.interestOps();
175                                            channel.register(selector, interestOps, ioHandler);
176                                    }catch(IOException ioex){
177                                            LoggerFactory.log("Registering a channel for reading failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
178                    
179                                            connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
180                                    }
181                            }
182                    }finally{
183                            selectionPreventionLatch.release();
184                    }
185            }
186    
187            /**
188             * @see IReadMultiplexer#deregisterForRead(SelectableChannel)
189             */
190            public void deregisterForRead(SelectableChannel channel){
191                    selectionPreventionLatch.acquire();
192                    
193                    try{
194                            selector.wakeup();
195                            
196                            synchronized(registrationLock){
197                                    SelectionKey key = channel.keyFor(selector);
198                                    if(key != null){
199                                            if(key.isValid()){
200                                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
201                    
202                                                    // Remove the attachment (if any), so it can be GCed
203                                                    if(key.interestOps() == 0) key.attach(null);
204                                            }else{
205                                                    key.cancel();
206                                                    key.attach(null);
207                                            }
208                                    }
209                            }
210                    }finally{
211                            selectionPreventionLatch.release();
212                    }
213            }
214    
215            /**
216             * @see IWriteMultiplexer#registerForWrite(SelectableChannel, SocketIOHandler)
217             */
218            public void registerForWrite(SelectableChannel channel, SocketIOHandler ioHandler){
219                    selectionPreventionLatch.acquire();
220                    
221                    try{
222                            selector.wakeup();
223                            
224                            synchronized(registrationLock){
225                                    try{
226                                            SelectionKey key = channel.keyFor(selector);
227                                            int interestOps = SelectionKey.OP_WRITE;
228                                            if(key != null) interestOps |= key.interestOps();
229                                            channel.register(selector, interestOps, ioHandler);
230                                    }catch(IOException ioex){
231                                            LoggerFactory.log("Registering a channel for writing failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
232                    
233                                            connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
234                                    }
235                            }
236                    }finally{
237                            selectionPreventionLatch.release();
238                    }
239            }
240    
241            /**
242             * @see IWriteMultiplexer#deregisterForWrite(SelectableChannel)
243             */
244            public void deregisterForWrite(SelectableChannel channel){
245                    selectionPreventionLatch.acquire();
246                    
247                    try{
248                            selector.wakeup();
249                            
250                            synchronized(registrationLock){
251                                    SelectionKey key = channel.keyFor(selector);
252                                    if(key != null){
253                                            if(key.isValid()){
254                                                    key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
255                    
256                                                    // Remove the attachment (if any), so it can be GCed
257                                                    if(key.interestOps() == 0) key.attach(null);
258                                            }else{
259                                                    key.cancel();
260                                                    key.attach(null);
261                                            }
262                                    }
263                            }
264                    }finally{
265                            selectionPreventionLatch.release();
266                    }
267            }
268    }