001    package toolbus.communication;
002    
003    import java.io.IOException;
004    import java.nio.channels.SelectableChannel;
005    import java.nio.channels.SelectionKey;
006    import java.nio.channels.Selector;
007    import java.nio.channels.SocketChannel;
008    import java.util.Iterator;
009    import java.util.Set;
010    
011    import toolbus.logging.ILogger;
012    import toolbus.logging.IToolBusLoggerConstants;
013    import toolbus.logging.LoggerFactory;
014    import toolbus.util.concurrency.Latch;
015    
016    /**
017     * This class handles the multiplexing of socket channels that we are registered for read
018     * operations.
019     * 
020     * @author Arnold Lankamp
021     */
022    public class SocketReadMultiplexer implements IReadMultiplexer, Runnable{
023            private final Latch selectionPreventionLatch = new Latch();
024            
025            private final AbstractConnectionHandler connectionHandler;
026            
027            private final Selector selector;
028            private volatile boolean running = false;
029    
030            /**
031             * Constructor.
032             * 
033             * @param connectionHandler
034             *            The connection handler this read multiplexer was created by.
035             */
036            public SocketReadMultiplexer(AbstractConnectionHandler connectionHandler){
037                    super();
038    
039                    this.connectionHandler = connectionHandler;
040    
041                    try{
042                            selector = Selector.open();
043                    }catch(IOException ioex){
044                            LoggerFactory.log("Unable to create a selector for the read multiplexer.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
045                            throw new RuntimeException(ioex);
046                    }
047                    
048                    running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
049            }
050    
051            /**
052             * Checks if this multiplexer is running or not.
053             * 
054             * @return True if this handler is running, false otherwise.
055             */
056            public boolean isRunning(){
057                    return running;
058            }
059    
060            /**
061             * Stops the execution of this multiplexer after then next iteration. The running state will be
062             * set to false and the selector will be forced to return directly.
063             */
064            public void stopRunning(){
065                    running = false;
066                    // Let the selector return immidiately
067                    selector.wakeup();
068            }
069    
070            /**
071             * The main loop of this multiplexer. It blocks until there is a registered channel which we are
072             * able to read from.
073             */
074            public void run(){
075                    while(running){
076                            boolean ready = false;
077                            while(!ready){
078                                    try{
079                                            // This barrier is for preventing this thread from obtaining the monitor we need for
080                                            // registering a channel. The select() call obtains several monitors and then
081                                            // blocks. Effectively this causes just about every method of the selector (and
082                                            // associated objects) to block indefinately when the select() call is in progress!
083                                            // Thanks Sun for this (incredibly) crappy design / implementation. Long live the
084                                            // community projects ... yeah NOT!
085                                            selectionPreventionLatch.await();
086                                            ready = true;
087                                    }catch(InterruptedException irex){
088                                            // Ignore
089                                    }
090                            }
091                            
092                            try{
093                                    selector.select(); // Wait till we got stuff to do or get woken up.
094                            }catch(IOException ioex){
095                                    LoggerFactory.log("An exception occured during the select call in the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
096                            }
097    
098                            Set<SelectionKey> readyKeys = selector.selectedKeys();
099                            Iterator<SelectionKey> iterator = readyKeys.iterator();
100                            while(iterator.hasNext()){
101                                    SelectionKey key = iterator.next();
102                                    iterator.remove();
103    
104                                    try{
105                                            if(key.isReadable()){
106                                                    read(key);
107                                            }
108                                    }catch(RuntimeException rex){
109                                            // Catch all RuntimeExceptions. We don't want to risk the termination of this thread.
110                                            LoggerFactory.log("A runtime exception occured during the execution of the read multiplexer; killing associated connection.", rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
111                                            connectionHandler.closeDueToException((SocketChannel) key.channel(), (SocketIOHandler) key.attachment());
112                                    }
113                            }
114                    }
115    
116                    // Close the selector once this thread stops running.
117                    try{
118                            selector.close();
119                    }catch(IOException ioex){
120                            LoggerFactory.log("Unable to close the selector of the read multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
121                    }
122            }
123    
124            /**
125             * Notifies the with the selection key associated I/O handler that we are able to receive data.
126             * 
127             * @param key
128             *            The key associated with the channel that we can read from.
129             */
130            private void read(SelectionKey key){
131                    SelectableChannel channel = key.channel();
132                    
133                    synchronized(channel){
134                            SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
135                            ioHandler.read();
136                    }
137            }
138    
139            /**
140             * @see IReadMultiplexer#registerForRead(SelectableChannel, SocketIOHandler)
141             */
142            public void registerForRead(SelectableChannel channel, SocketIOHandler ioHandler){
143                    selectionPreventionLatch.acquire();
144                    
145                    try{
146                            selector.wakeup();
147            
148                            synchronized(channel){
149                                    try{
150                                            SelectionKey key = channel.keyFor(selector);
151                                            if(key == null){
152                                                    channel.register(selector, SelectionKey.OP_READ, ioHandler);
153                                            }else{
154                                                    key.interestOps(SelectionKey.OP_READ);
155                                                    key.attach(ioHandler);
156                                            }
157                                    }catch(IOException ioex){
158                                            LoggerFactory.log("Registering a channel for reading failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
159                    
160                                            connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
161                                    }
162                            }
163                    }finally{
164                            selectionPreventionLatch.release();
165                    }
166            }
167    
168            /**
169             * @see IReadMultiplexer#deregisterForRead(SelectableChannel)
170             */
171            public void deregisterForRead(SelectableChannel channel){
172                    selectionPreventionLatch.acquire();
173                    
174                    try{
175                            selector.wakeup();
176            
177                            synchronized(channel){
178                                    SelectionKey key = channel.keyFor(selector);
179                                    
180                                    if(key != null){
181                                            if(key.isValid()) key.interestOps(0);
182                                            else key.cancel();
183                    
184                                            // Remove the attachment (if any), so it can be GCed
185                                            key.attach(null);
186                                    }
187                            }
188                    }finally{
189                            selectionPreventionLatch.release();
190                    }
191            }
192    }