001    package toolbus.adapter;
002    
003    import java.io.IOException;
004    import java.net.ConnectException;
005    import java.net.InetAddress;
006    import java.net.InetSocketAddress;
007    import java.net.Socket;
008    import java.net.SocketException;
009    import java.nio.ByteBuffer;
010    import java.nio.channels.SocketChannel;
011    
012    import jjtraveler.VisitFailure;
013    import toolbus.communication.AbstractConnectionHandler;
014    import toolbus.communication.IReadMultiplexer;
015    import toolbus.communication.IWriteMultiplexer;
016    import toolbus.communication.SocketIOHandler;
017    import toolbus.communication.SocketReadWriteMultiplexer;
018    import toolbus.logging.ILogger;
019    import toolbus.logging.IToolBusLoggerConstants;
020    import toolbus.logging.LoggerFactory;
021    import aterm.ATerm;
022    import aterm.ATermInt;
023    import aterm.pure.PureFactory;
024    import aterm.pure.binary.BinaryReader;
025    import aterm.pure.binary.BinaryWriter;
026    
027    /**
028     * This class handles the opening and closing of the tool's connection with the ToolBus and provides
029     * access to the multiplexers that are handling network I/O.
030     * 
031     * @author Arnold Lankamp
032     */
033    public class ToolConnectionHandler extends AbstractConnectionHandler implements Runnable{
034            private final static int HANDSHAKEBUFFERSIZE = 4096;
035    
036            private final SocketReadWriteMultiplexer socketReadWriteMultiplexer;
037    
038            /**
039             * Constructor.
040             * 
041             * @param toolBridge
042             *            The tool bridge that requested the starting of this tool connection.
043             * @param host
044             *            The adress where the ToolBus is running.
045             * @param port
046             *            The port the ToolBus is running on.
047             */
048            public ToolConnectionHandler(ToolBridge toolBridge, InetAddress host, int port){
049                    super();
050    
051                    SocketChannel socketChannel = null;
052                    try{
053                            socketChannel = SocketChannel.open();
054                            Socket socket = socketChannel.socket();
055                            // Disable Nagle's algorithm, we don't want the random 500ms delays.
056                            socket.setTcpNoDelay(true);
057                            try{
058                                    // Set the traffic class to high throughput and low delay.
059                                    socket.setTrafficClass(0x18);
060                            }catch(SocketException sex){
061                                    // This catch block is only here because some operating systems have a problem with setting priorities.
062                            }
063    
064                            socketChannel.connect(new InetSocketAddress(host, port));
065                            socketChannel.configureBlocking(true);
066    
067                            shakeHands(socketChannel, toolBridge);
068    
069                            socketChannel.configureBlocking(false);
070                    }catch(ConnectException cex){
071                            LoggerFactory.log("Unable to connect with the ToolBus.", cex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
072                            throw new RuntimeException(cex);
073                    }catch(IOException ioex){
074                            LoggerFactory.log("Unable to establish a connection with the ToolBus.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
075                            closeConnection(socketChannel);
076                            throw new RuntimeException(ioex);
077                    }catch(RuntimeException rex){
078                            LoggerFactory.log("A runtime exception occured while trying to connect with the ToolBus.", rex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
079                            closeConnection(socketChannel);
080                            throw new RuntimeException(rex);
081                    }
082    
083                    socketReadWriteMultiplexer = new SocketReadWriteMultiplexer(this);
084                    SocketIOHandler socketIOHandler = new SocketIOHandler(toolBridge, this, socketChannel);
085                    toolBridge.setIOHandler(socketIOHandler);
086    
087                    socketReadWriteMultiplexer.registerForRead(socketChannel, socketIOHandler);
088            }
089    
090            /**
091             * Starts the multiplexer.
092             * 
093             * @see Runnable#run()
094             */
095            public void run(){
096                    Thread multiplexerThread = new Thread(socketReadWriteMultiplexer);
097                    multiplexerThread.setName("Multiplexer");
098                    multiplexerThread.start();
099            }
100    
101            /**
102             * @see AbstractConnectionHandler#getReadMultiplexer()
103             */
104            public IReadMultiplexer getReadMultiplexer(){
105                    return socketReadWriteMultiplexer;
106            }
107    
108            /**
109             * @see AbstractConnectionHandler#getWriteMultiplexer()
110             */
111            public IWriteMultiplexer getWriteMultiplexer(){
112                    return socketReadWriteMultiplexer;
113            }
114    
115            /**
116             * Negotiates with the ToolBus (about some stuff) and checks if the tool adheres to the expected
117             * interface.
118             * 
119             * @param socketChannel
120             *            The socket channel that we need to use to perform the handshake.
121             * @param toolBridge
122             *            The tool bridge that provides access to the tool.
123             * @throws IOException
124             *             Thrown when something goes wrong with the connection during the handshake.
125             */
126            private void shakeHands(SocketChannel socketChannel, ToolBridge toolBridge) throws IOException{
127                    ByteBuffer handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE);
128    
129                    PureFactory factory = toolBridge.getFactory();
130                    ATerm toolKey = factory.makeAppl(factory.makeAFun(toolBridge.getToolName(), 1, false), factory.makeInt(toolBridge.getToolID()));
131    
132                    // Send the tool id.
133                    writeTermToChannel(toolKey, socketChannel, handShakeBuffer);
134    
135                    // Receive signature.
136                    handShakeBuffer.clear();
137                    ATerm signatures = readTermFromChannel(factory, socketChannel, handShakeBuffer);
138    
139                    // Check the signature.
140                    boolean sigOK = toolBridge.checkSignature(signatures);
141                    byte sigOKByte = 0;
142                    if(sigOK) sigOKByte = 1;
143    
144                    // Send signature confirmation.
145                    handShakeBuffer.clear();
146                    handShakeBuffer.limit(1);
147                    handShakeBuffer.put(sigOKByte);
148                    handShakeBuffer.flip();
149                    socketChannel.write(handShakeBuffer);
150    
151                    if(sigOKByte != 1){
152                            LoggerFactory.log("Signatures didn't match tool interface. Disconnecting ....", ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
153                            throw new RuntimeException("Signatures didn't match tool interface. Disconnecting ....");
154                    }
155    
156                    // Receive the (permanent) tool key.
157                    ATerm newToolKey = readTermFromChannel(factory, socketChannel, handShakeBuffer);
158    
159                    int toolID = ((ATermInt) newToolKey.getChildAt(0)).getInt();
160                    toolBridge.setToolID(toolID);
161            }
162    
163            /**
164             * Transmits the term to the given socket channel, using the given buffer.
165             * 
166             * @param aTerm
167             *            The term to write.
168             * @param socketChannel
169             *            The channel to write to.
170             * @param byteBuffer
171             *            The buffer to use.
172             * @throws IOException
173             *             Thrown when something goes wrong while writing to the channel.
174             */
175            private void writeTermToChannel(ATerm aTerm, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
176                    BinaryWriter binaryWriter = new BinaryWriter(aTerm);
177                    while(!binaryWriter.isFinished()){
178                            byteBuffer.clear();
179                            byteBuffer.position(2);
180                            try{
181                                    binaryWriter.serialize(byteBuffer);
182                            }catch(VisitFailure vf){
183                                    // Bogus catch block, this can't happen.
184                            }
185                            // Insert chunk size data.
186                            int chunkSize = byteBuffer.limit() - 2;
187                            byteBuffer.put(0, (byte) (chunkSize & 0x000000FF));
188                            byteBuffer.put(1, (byte) ((chunkSize >>> 8) & 0x000000FF));
189                            
190                            // Write chunk
191                            socketChannel.write(byteBuffer);
192                    }
193            }
194    
195            /**
196             * Reads a term from the given channel, using the given buffer.
197             * 
198             * @param factory
199             *            The factory to use for parsing the term.
200             * @param socketChannel
201             *            The channel to read from.
202             * @param byteBuffer
203             *            The buffer to use.
204             * @return The term that was read.
205             * @throws IOException
206             *             Thrown when something goes wrong while reading the term from the channel.
207             */
208            private ATerm readTermFromChannel(PureFactory factory, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
209                    BinaryReader binaryReader = new BinaryReader(factory);
210                    while(!binaryReader.isDone()){
211                            byteBuffer.clear();
212                            byteBuffer.limit(2);
213                            socketChannel.read(byteBuffer);
214                            byteBuffer.flip();
215                            
216                            int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8);
217                            
218                            byteBuffer.clear();
219                            byteBuffer.limit(chunkSize);
220                            socketChannel.read(byteBuffer);
221                            byteBuffer.flip();
222                            
223                            binaryReader.deserialize(byteBuffer);
224                    }
225    
226                    return binaryReader.getRoot();
227            }
228    
229            /**
230             * @see AbstractConnectionHandler#closeConnection(SocketChannel)
231             */
232            public void closeConnection(SocketChannel socketChannel){
233                    if(socketReadWriteMultiplexer != null) socketReadWriteMultiplexer.stopRunning();
234    
235                    Socket socket = socketChannel.socket();
236                    
237                    // Close the socket
238                    LoggerFactory.log("Closing connection with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION);
239    
240                    // Close the in- output stream of the socket to ensure that the file descriptors are closed
241                    // immidiately and NOT whenever the JVM feels like it.
242                    try{
243                            if(!socket.isInputShutdown()) socket.shutdownInput();
244                    }catch(IOException ioex){
245                            // Ignore
246                    }
247                    try{
248                            if(!socket.isOutputShutdown()) socket.shutdownOutput();
249                    }catch(IOException ioex){
250                            // Ignore
251                    }
252    
253                    try{
254                            if(!socket.isClosed()) socket.close();
255                    }catch(IOException ioex){
256                            LoggerFactory.log("Failed to close the socket with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
257                    }
258            }
259    }