001    package toolbus.communication;
002    
003    import java.io.IOException;
004    import java.nio.channels.SelectableChannel;
005    import java.nio.channels.SelectionKey;
006    import java.nio.channels.Selector;
007    import java.nio.channels.SocketChannel;
008    import java.util.Iterator;
009    import java.util.Set;
010    
011    import toolbus.logging.ILogger;
012    import toolbus.logging.IToolBusLoggerConstants;
013    import toolbus.logging.LoggerFactory;
014    import toolbus.util.concurrency.Latch;
015    
016    /**
017     * This class handles the multiplexing of socket channels that we are registered for write
018     * operations.
019     * 
020     * @author Arnold Lankamp
021     */
022    public class SocketWriteMultiplexer implements IWriteMultiplexer, Runnable{
023            private final Latch selectionPreventionLatch = new Latch();
024            
025            private final AbstractConnectionHandler connectionHandler;
026            private final Selector selector;
027            private volatile boolean running = false;
028    
029            /**
030             * Constructor.
031             * 
032             * @param connectionHandler
033             *            The connection handler this write multiplexer was created by.
034             */
035            public SocketWriteMultiplexer(AbstractConnectionHandler connectionHandler){
036                    super();
037    
038                    this.connectionHandler = connectionHandler;
039    
040                    try{
041                            selector = Selector.open();
042                    }catch(IOException ioex){
043                            LoggerFactory.log("Unable to create a selector for the write multiplexer.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
044                            throw new RuntimeException(ioex);
045                    }
046                    
047                    running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
048            }
049    
050            /**
051             * Checks if this multiplexer is running or not.
052             * 
053             * @return True if this handler is running, false otherwise.
054             */
055            public boolean isRunning(){
056                    return running;
057            }
058    
059            /**
060             * Stops the execution of this multiplexer after then next iteration. The running state will be
061             * set to false and the selector will be forced to return directly.
062             */
063            public void stopRunning(){
064                    running = false;
065                    // Let the selector return immidiately
066                    selector.wakeup();
067            }
068    
069            /**
070             * The main loop of this multiplexer. It blocks until there is a registered channel which we are
071             * able to write to.
072             */
073            public void run(){
074                    while(running){
075                            boolean ready = false;
076                            while(!ready){
077                                    try{
078                                            // This barrier is for preventing this thread from obtaining the monitor we need for
079                                            // registering a channel. The select() call obtains several monitors and then
080                                            // blocks. Effectively this causes just about every method of the selector (and
081                                            // associated objects) to block indefinately when the select() call is in progress!
082                                            // Thanks Sun for this (incredibly) crappy design / implementation. Long live the
083                                            // community projects ... yeah NOT!
084                                            selectionPreventionLatch.await();
085                                            ready = true;
086                                    }catch(InterruptedException irex){
087                                            // Ignore
088                                    }
089                            }
090                            
091                            try{
092                                    selector.select(); // Wait till we got stuff to do or get woken up.
093                            }catch(IOException ioex){
094                                    LoggerFactory.log("An exception occured during the select call in the write multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
095                            }
096                            
097                            Set<SelectionKey> readyKeys = selector.selectedKeys();
098                            Iterator<SelectionKey> iterator = readyKeys.iterator();
099                            while(iterator.hasNext()){
100                                    SelectionKey key = iterator.next();
101                                    iterator.remove();
102    
103                                    if(key.isWritable()){
104                                            try{
105                                                    write(key);
106                                            }catch(RuntimeException rex){
107                                                    // Catch all RuntimeExceptions. We don't want to risk the termination of this thread.
108                                                    LoggerFactory.log("A runtime exception occured during the execution of the write multiplexer; killing associated connection.", rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
109                                                    connectionHandler.closeDueToException((SocketChannel) key.channel(), (SocketIOHandler) key.attachment());
110                                            }
111                                    }
112                            }
113                    }
114    
115                    // Close the selector once this thread stops running.
116                    try{
117                            selector.close();
118                    }catch(IOException ioex){
119                            LoggerFactory.log("Unable to close the selector of the write multiplexer.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
120                    }
121            }
122    
123            /**
124             * Notifies the with the selection key associated I/O handler that we are able to send data.
125             * 
126             * @param key
127             *            The key associated with the channel that we can write to.
128             */
129            private void write(SelectionKey key){
130                    SelectableChannel channel = key.channel();
131                    
132                    synchronized(channel){
133                            SocketIOHandler ioHandler = (SocketIOHandler) key.attachment();
134            
135                            ioHandler.write();
136            
137                            // If there is not more data that needs to be written or the channel has been closed in the
138                            // mean time, de-register it.
139                            if(!ioHandler.hasMoreToWrite() || !channel.isOpen()) deregisterForWrite(channel);
140                    }
141            }
142    
143            /**
144             * @see IWriteMultiplexer#registerForWrite(SelectableChannel, SocketIOHandler)
145             */
146            public void registerForWrite(SelectableChannel channel, SocketIOHandler ioHandler){
147                    selectionPreventionLatch.acquire();
148                    
149                    try{
150                            selector.wakeup();
151    
152                            synchronized(channel){
153                                    try{
154                                            SelectionKey key = channel.keyFor(selector);
155                                            if(key == null){
156                                                    channel.register(selector, SelectionKey.OP_WRITE, ioHandler);
157                                            }else{
158                                                    key.interestOps(SelectionKey.OP_WRITE);
159                                                    key.attach(ioHandler);
160                                            }
161                                    }catch(IOException ioex){
162                                            LoggerFactory.log("Registering a channel for writing failed", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
163                    
164                                            connectionHandler.closeDueToException((SocketChannel) channel, ioHandler);
165                                    }
166                            }
167                    }finally{
168                            selectionPreventionLatch.release();
169                    }
170            }
171    
172            /**
173             * @see IWriteMultiplexer#deregisterForWrite(SelectableChannel)
174             */
175            public void deregisterForWrite(SelectableChannel channel){
176                    selectionPreventionLatch.acquire();
177                    
178                    try{
179                            selector.wakeup();
180                            
181                            synchronized(channel){
182                                    SelectionKey key = channel.keyFor(selector);
183                                    if(key != null){
184                                            if(key.isValid()) key.interestOps(0);
185                                            else key.cancel();
186                    
187                                            // Remove the attachment (if any), so it can be GCed
188                                            key.attach(null);
189                                    }
190                            }
191                    }finally{
192                            selectionPreventionLatch.release();
193                    }
194            }
195    }