001    package toolbus;
002    
003    import java.io.IOException;
004    import java.net.InetSocketAddress;
005    import java.net.ServerSocket;
006    import java.net.Socket;
007    import java.net.SocketException;
008    import java.nio.ByteBuffer;
009    import java.nio.channels.ServerSocketChannel;
010    import java.nio.channels.SocketChannel;
011    import jjtraveler.VisitFailure;
012    import toolbus.communication.AbstractConnectionHandler;
013    import toolbus.communication.SocketIOHandler;
014    import toolbus.communication.SocketReadMultiplexer;
015    import toolbus.communication.SocketWriteMultiplexer;
016    import toolbus.exceptions.ToolBusException;
017    import toolbus.logging.ILogger;
018    import toolbus.logging.IToolBusLoggerConstants;
019    import toolbus.logging.LoggerFactory;
020    import toolbus.tool.ToolDefinition;
021    import toolbus.tool.ToolInstance;
022    import aterm.ATerm;
023    import aterm.ATermAppl;
024    import aterm.pure.PureFactory;
025    import aterm.pure.binary.BinaryReader;
026    import aterm.pure.binary.BinaryWriter;
027    
028    /**
029     * This class handles the establishing and closing of all TCP/IP connections.
030     * 
031     * @author Arnold Lankamp
032     */
033    public class SocketConnectionHandler extends AbstractConnectionHandler implements Runnable{
034            private final static int HANDSHAKEBUFFERSIZE = 4096;
035            
036            private final ToolBus toolbus;
037    
038            private final SocketReadMultiplexer readMultiplexer;
039            private final SocketWriteMultiplexer writeMultiplexer;
040    
041            private final ServerSocketChannel serverSocketChannel;
042            private final ByteBuffer handShakeBuffer;
043            private volatile boolean running = false;
044    
045            /**
046             * Constructor.
047             * 
048             * @param toolbus
049             *            A reference to the main class of the Toolbus.
050             */
051            public SocketConnectionHandler(ToolBus toolbus){
052                    super();
053                    
054                    this.toolbus = toolbus;
055                    
056                    try{
057                            serverSocketChannel = ServerSocketChannel.open();
058                    }catch(IOException ioex){
059                            LoggerFactory.log("Unable open a server socket.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
060                            throw new RuntimeException(ioex);
061                    }
062                    
063                    readMultiplexer = new SocketReadMultiplexer(this);
064                    writeMultiplexer = new SocketWriteMultiplexer(this);
065    
066                    handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE);
067                    
068                    running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
069            }
070    
071            /**
072             * Initializes the connection handler. Initialization MUST occur before executing the run method.
073             * 
074             * @throws IOException
075             */
076            public void initialize() throws IOException{
077                    ServerSocket serverSocket = serverSocketChannel.socket();
078                    serverSocket.bind(null);
079                    serverSocketChannel.configureBlocking(true);
080                    
081                    Thread readThread = new Thread(readMultiplexer);
082                    readThread.setName("Read multiplexer");
083                    readThread.start();
084    
085                    Thread writeThread = new Thread(writeMultiplexer);
086                    writeThread.setName("Write multiplexer");
087                    writeThread.start();
088            }
089            
090            /**
091             * Initializes the connection handler on a user specified port. Initialization MUST occur before
092             * executing the run method.
093             * 
094             * @throws IOException
095             */
096            public void initialize(int port) throws IOException{
097                    ServerSocket serverSocket = serverSocketChannel.socket();
098                    serverSocket.bind(new InetSocketAddress(port));
099                    serverSocketChannel.configureBlocking(true);
100                    
101                    Thread readThread = new Thread(readMultiplexer);
102                    readThread.setName("Read multiplexer");
103                    readThread.start();
104    
105                    Thread writeThread = new Thread(writeMultiplexer);
106                    writeThread.setName("Write multiplexer");
107                    writeThread.start();
108            }
109            
110            /**
111             * Returns the port number the ToolBus is currently running on.
112             * 
113             * @return The port number the ToolBus is currently running on.
114             */
115            public int getPort(){
116                    return serverSocketChannel.socket().getLocalPort();
117            }
118    
119            /**
120             * @see AbstractConnectionHandler#getReadMultiplexer()
121             */
122            public SocketReadMultiplexer getReadMultiplexer(){
123                    return readMultiplexer;
124            }
125    
126            /**
127             * @see AbstractConnectionHandler#getWriteMultiplexer()
128             */
129            public SocketWriteMultiplexer getWriteMultiplexer(){
130                    return writeMultiplexer;
131            }
132    
133            /**
134             * Checks if this handler is running or not.
135             * 
136             * @return True if this handler is running, false otherwise.
137             */
138            public boolean isRunning(){
139                    return running;
140            }
141    
142            /**
143             * Stops the execution of this handler. The running state will be set to false and the server
144             * socket will be closed.
145             */
146            public void stopRunning(){
147                    running = false;
148    
149                    try{
150                            ServerSocket serverSocket = serverSocketChannel.socket();
151                            serverSocket.close();
152                    }catch(IOException ioex){
153                            LoggerFactory.log("An error occured while shutting down the connection handler.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
154                    }
155                    
156                    readMultiplexer.stopRunning();
157                    writeMultiplexer.stopRunning();
158            }
159    
160            /**
161             * Main loop of this handler. It tries to accept any incoming connection attempt.
162             */
163            public void run(){
164                    while(running){
165                            try{
166                                    acceptConnection();
167                            }catch(IOException ioex){
168                                    if(running) LoggerFactory.log("An error occured while accepting a connection.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
169                                    running = false; // temporary fix to avoid a loop
170                            }
171                    }
172            }
173    
174            /**
175             * Accepts a connection with a remote application.
176             * 
177             * @throws IOException
178             *             Thrown when something goes wrong while accepting the connection, registering it
179             *             or instantiating the associated I/O handlers.
180             */
181            private void acceptConnection() throws IOException{
182                    SocketChannel socketChannel = serverSocketChannel.accept();
183    
184                    Socket socket = socketChannel.socket();
185                    // Disable Nagle's algorithm, we don't want the random 500ms delays.
186                    socket.setTcpNoDelay(true);
187                    try{
188                            // Set the traffic class to high throughput and low delay.
189                            socket.setTrafficClass(0x18);
190                    }catch(SocketException sex){
191                            // This catch block is only here because some operating systems have a problem with setting priorities.
192                    }
193    
194                    ToolInstance toolInstance = null;
195                    try{
196                            toolInstance = shakeHands(socketChannel);
197                    }catch(IOException ioex){
198                            LoggerFactory.log("Unable to shake hands with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
199                            closeConnection(socketChannel);
200                            return;
201                    }catch(ToolBusException tbex){
202                            LoggerFactory.log("Unable to shake hands with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
203                            closeConnection(socketChannel);
204                            return;
205                    }catch(RuntimeException rex){
206                            LoggerFactory.log("An runtime exception occured while executing a handshake with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
207                            closeConnection(socketChannel);
208                            return;
209                    }
210    
211                    if(toolInstance == null){
212                            LoggerFactory.log("Tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ", didn't supply the the expected interface.", ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
213                            closeConnection(socketChannel);
214                            return;
215                    }
216    
217                    socketChannel.configureBlocking(false);
218    
219                    // Set up handlers
220                    SocketIOHandler ioHandler = new SocketIOHandler(toolInstance, this, socketChannel);
221                    toolInstance.setIOHandler(ioHandler);
222    
223                    readMultiplexer.registerForRead(socketChannel, ioHandler);
224            }
225    
226            private ToolInstance shakeHands(SocketChannel socketChannel) throws IOException, ToolBusException{
227                    socketChannel.configureBlocking(true);
228                    
229                    // Receive tool id.
230                    ATermAppl toolKey = (ATermAppl) readTermFromChannel(toolbus.getTBTermFactory(), socketChannel, handShakeBuffer);
231                    String toolName = toolKey.getAFun().getName();
232                    
233                    ToolDefinition toolDef = toolbus.getToolDefinition(toolName);
234                    if(toolDef == null){
235                            String error = "No toolDef found for tool with name: " + toolName;
236                            LoggerFactory.log(error, ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
237                            throw new RuntimeException(error);
238                    }
239    
240                    ToolInstanceManager toolInstanceManager = toolbus.getToolInstanceManager();
241    
242                    ToolInstance toolInstance = toolInstanceManager.getPendingTool(toolKey);
243                    
244                    // If we didn't request the tool with the given id to execute, it's connecting on it's own
245                    // initiative.
246                    if(toolInstance == null){
247                            toolInstance = new ToolInstance(toolDef, toolbus);
248                            toolInstanceManager.addDynamiclyConnectedTool(toolInstance);
249    
250                            LoggerFactory.log("Tool: " + toolInstance.getToolKey() + ", connected at its own initiative.", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION);
251                    }
252    
253                    // Send the signature.
254                    writeTermToChannel(toolDef.getSignature(), socketChannel, handShakeBuffer);
255    
256                    // Receive signature confirmation.
257                    handShakeBuffer.clear();
258                    handShakeBuffer.limit(1);
259                    socketChannel.read(handShakeBuffer);
260                    handShakeBuffer.flip();
261                    byte signatureConfirmation = handShakeBuffer.get();
262                    if(signatureConfirmation != 1){// Not OK
263                            toolInstance.kill();
264                            toolInstance = null;
265                    }else{
266                            // Send the (permanent) tool key back to the tool.
267                            writeTermToChannel(toolInstance.getToolKey(), socketChannel, handShakeBuffer);
268                    }
269                    
270                    return toolInstance;
271            }
272    
273            /**
274             * Transmits the term to the given socket channel, using the given buffer.
275             * 
276             * @param aTerm
277             *            The term to write.
278             * @param socketChannel
279             *            The channel to write to.
280             * @param byteBuffer
281             *            The buffer to use.
282             * @throws IOException
283             *             Thrown when something goes wrong while writing to the channel.
284             */
285            private void writeTermToChannel(ATerm aTerm, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
286                    BinaryWriter binaryWriter = new BinaryWriter(aTerm);
287                    while(!binaryWriter.isFinished()){
288                            byteBuffer.clear();
289                            byteBuffer.position(2);
290                            try{
291                                    binaryWriter.serialize(byteBuffer);
292                            }catch(VisitFailure vf){
293                                    // Bogus catch block, this can't happen.
294                            }
295                            // Insert chunk size data.
296                            int chunkSize = byteBuffer.limit() - 2;
297                            byteBuffer.put(0, (byte) (chunkSize & 0x000000FF));
298                            byteBuffer.put(1, (byte) ((chunkSize & 0x0000FF00) >> 8));
299                            
300                            // Write chunk
301                            socketChannel.write(byteBuffer);
302                    }
303            }
304    
305            /**
306             * Reads a term from the given channel, using the given buffer.
307             * 
308             * @param factory
309             *            The factory to use for parsing the term.
310             * @param socketChannel
311             *            The channel to read from.
312             * @param byteBuffer
313             *            The buffer to use.
314             * @return The term that was read.
315             * @throws IOException
316             *             Thrown when something goes wrong while reading the term from the channel.
317             */
318            private ATerm readTermFromChannel(PureFactory factory, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
319                    BinaryReader binaryReader = new BinaryReader(factory);
320                    while(!binaryReader.isDone()){
321                            byteBuffer.clear();
322                            byteBuffer.limit(2);
323                            socketChannel.read(byteBuffer);
324                            byteBuffer.flip();
325                            
326                            int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8);
327                            byteBuffer.clear();
328                            byteBuffer.limit(chunkSize);
329                            socketChannel.read(byteBuffer);
330                            byteBuffer.flip();
331                            
332                            binaryReader.deserialize(byteBuffer);
333                    }
334    
335                    return binaryReader.getRoot();
336            }
337    
338            /**
339             * @see AbstractConnectionHandler#closeConnection(SocketChannel)
340             */
341            public void closeConnection(SocketChannel socketChannel){
342                    readMultiplexer.deregisterForRead(socketChannel);
343                    writeMultiplexer.deregisterForWrite(socketChannel);
344                    
345                    Socket socket = socketChannel.socket();
346                    LoggerFactory.log("Closing connection with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION);
347    
348                    // Close the in- output stream of the socket to ensure that the file descriptors are closed
349                    // immidiately and NOT whenever the JVM feels like it.
350                    try{
351                            if(!socket.isInputShutdown()) socket.shutdownInput();
352                    }catch(IOException ioex){
353                            // Ignore
354                    }
355                    try{
356                            if(!socket.isOutputShutdown()) socket.shutdownOutput();
357                    }catch(IOException ioex){
358                            // Ignore
359                    }
360    
361                    try{
362                            if(!socket.isClosed()) socket.close();
363                    }catch(IOException ioex){
364                            LoggerFactory.log("Failed to close the socket with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
365                    }
366                    
367                    try{
368                            if(socketChannel.isOpen()) socketChannel.close();
369                    }catch(IOException ioex){
370                            LoggerFactory.log("Failed to close the socket channel with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
371                    }
372            }
373    }