001 package toolbus.adapter;
002
003 import java.io.IOException;
004 import java.net.ConnectException;
005 import java.net.InetAddress;
006 import java.net.InetSocketAddress;
007 import java.net.Socket;
008 import java.net.SocketException;
009 import java.nio.ByteBuffer;
010 import java.nio.channels.SocketChannel;
011
012 import jjtraveler.VisitFailure;
013 import toolbus.communication.AbstractConnectionHandler;
014 import toolbus.communication.IReadMultiplexer;
015 import toolbus.communication.IWriteMultiplexer;
016 import toolbus.communication.SocketIOHandler;
017 import toolbus.communication.SocketReadWriteMultiplexer;
018 import toolbus.logging.ILogger;
019 import toolbus.logging.IToolBusLoggerConstants;
020 import toolbus.logging.LoggerFactory;
021 import aterm.ATerm;
022 import aterm.ATermInt;
023 import aterm.pure.PureFactory;
024 import aterm.pure.binary.BinaryReader;
025 import aterm.pure.binary.BinaryWriter;
026
027 /**
028 * This class handles the opening and closing of the tool's connection with the ToolBus and provides
029 * access to the multiplexers that are handling network I/O.
030 *
031 * @author Arnold Lankamp
032 */
033 public class ToolConnectionHandler extends AbstractConnectionHandler implements Runnable{
034 private final static int HANDSHAKEBUFFERSIZE = 4096;
035
036 private final SocketReadWriteMultiplexer socketReadWriteMultiplexer;
037
038 /**
039 * Constructor.
040 *
041 * @param toolBridge
042 * The tool bridge that requested the starting of this tool connection.
043 * @param host
044 * The adress where the ToolBus is running.
045 * @param port
046 * The port the ToolBus is running on.
047 */
048 public ToolConnectionHandler(ToolBridge toolBridge, InetAddress host, int port){
049 super();
050
051 SocketChannel socketChannel = null;
052 try{
053 socketChannel = SocketChannel.open();
054 Socket socket = socketChannel.socket();
055 // Disable Nagle's algorithm, we don't want the random 500ms delays.
056 socket.setTcpNoDelay(true);
057 try{
058 // Set the traffic class to high throughput and low delay.
059 socket.setTrafficClass(0x18);
060 }catch(SocketException sex){
061 // This catch block is only here because some operating systems have a problem with setting priorities.
062 }
063
064 socketChannel.connect(new InetSocketAddress(host, port));
065 socketChannel.configureBlocking(true);
066
067 shakeHands(socketChannel, toolBridge);
068
069 socketChannel.configureBlocking(false);
070 }catch(ConnectException cex){
071 LoggerFactory.log("Unable to connect with the ToolBus.", cex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
072 throw new RuntimeException(cex);
073 }catch(IOException ioex){
074 LoggerFactory.log("Unable to establish a connection with the ToolBus.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
075 closeConnection(socketChannel);
076 throw new RuntimeException(ioex);
077 }catch(RuntimeException rex){
078 LoggerFactory.log("A runtime exception occured while trying to connect with the ToolBus.", rex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
079 closeConnection(socketChannel);
080 throw new RuntimeException(rex);
081 }
082
083 socketReadWriteMultiplexer = new SocketReadWriteMultiplexer(this);
084 SocketIOHandler socketIOHandler = new SocketIOHandler(toolBridge, this, socketChannel);
085 toolBridge.setIOHandler(socketIOHandler);
086
087 socketReadWriteMultiplexer.registerForRead(socketChannel, socketIOHandler);
088 }
089
090 /**
091 * Starts the multiplexer.
092 *
093 * @see Runnable#run()
094 */
095 public void run(){
096 Thread multiplexerThread = new Thread(socketReadWriteMultiplexer);
097 multiplexerThread.setName("Multiplexer");
098 multiplexerThread.start();
099 }
100
101 /**
102 * @see AbstractConnectionHandler#getReadMultiplexer()
103 */
104 public IReadMultiplexer getReadMultiplexer(){
105 return socketReadWriteMultiplexer;
106 }
107
108 /**
109 * @see AbstractConnectionHandler#getWriteMultiplexer()
110 */
111 public IWriteMultiplexer getWriteMultiplexer(){
112 return socketReadWriteMultiplexer;
113 }
114
115 /**
116 * Negotiates with the ToolBus (about some stuff) and checks if the tool adheres to the expected
117 * interface.
118 *
119 * @param socketChannel
120 * The socket channel that we need to use to perform the handshake.
121 * @param toolBridge
122 * The tool bridge that provides access to the tool.
123 * @throws IOException
124 * Thrown when something goes wrong with the connection during the handshake.
125 */
126 private void shakeHands(SocketChannel socketChannel, ToolBridge toolBridge) throws IOException{
127 ByteBuffer handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE);
128
129 PureFactory factory = toolBridge.getFactory();
130 ATerm toolKey = factory.makeAppl(factory.makeAFun(toolBridge.getToolName(), 1, false), factory.makeInt(toolBridge.getToolID()));
131
132 // Send the tool id.
133 writeTermToChannel(toolKey, socketChannel, handShakeBuffer);
134
135 // Receive signature.
136 handShakeBuffer.clear();
137 ATerm signatures = readTermFromChannel(factory, socketChannel, handShakeBuffer);
138
139 // Check the signature.
140 boolean sigOK = toolBridge.checkSignature(signatures);
141 byte sigOKByte = 0;
142 if(sigOK) sigOKByte = 1;
143
144 // Send signature confirmation.
145 handShakeBuffer.clear();
146 handShakeBuffer.limit(1);
147 handShakeBuffer.put(sigOKByte);
148 handShakeBuffer.flip();
149 socketChannel.write(handShakeBuffer);
150
151 if(sigOKByte != 1){
152 LoggerFactory.log("Signatures didn't match tool interface. Disconnecting ....", ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
153 throw new RuntimeException("Signatures didn't match tool interface. Disconnecting ....");
154 }
155
156 // Receive the (permanent) tool key.
157 ATerm newToolKey = readTermFromChannel(factory, socketChannel, handShakeBuffer);
158
159 int toolID = ((ATermInt) newToolKey.getChildAt(0)).getInt();
160 toolBridge.setToolID(toolID);
161 }
162
163 /**
164 * Transmits the term to the given socket channel, using the given buffer.
165 *
166 * @param aTerm
167 * The term to write.
168 * @param socketChannel
169 * The channel to write to.
170 * @param byteBuffer
171 * The buffer to use.
172 * @throws IOException
173 * Thrown when something goes wrong while writing to the channel.
174 */
175 private void writeTermToChannel(ATerm aTerm, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
176 BinaryWriter binaryWriter = new BinaryWriter(aTerm);
177 while(!binaryWriter.isFinished()){
178 byteBuffer.clear();
179 byteBuffer.position(2);
180 try{
181 binaryWriter.serialize(byteBuffer);
182 }catch(VisitFailure vf){
183 // Bogus catch block, this can't happen.
184 }
185 // Insert chunk size data.
186 int chunkSize = byteBuffer.limit() - 2;
187 byteBuffer.put(0, (byte) (chunkSize & 0x000000FF));
188 byteBuffer.put(1, (byte) ((chunkSize >>> 8) & 0x000000FF));
189
190 // Write chunk
191 socketChannel.write(byteBuffer);
192 }
193 }
194
195 /**
196 * Reads a term from the given channel, using the given buffer.
197 *
198 * @param factory
199 * The factory to use for parsing the term.
200 * @param socketChannel
201 * The channel to read from.
202 * @param byteBuffer
203 * The buffer to use.
204 * @return The term that was read.
205 * @throws IOException
206 * Thrown when something goes wrong while reading the term from the channel.
207 */
208 private ATerm readTermFromChannel(PureFactory factory, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
209 BinaryReader binaryReader = new BinaryReader(factory);
210 while(!binaryReader.isDone()){
211 byteBuffer.clear();
212 byteBuffer.limit(2);
213 socketChannel.read(byteBuffer);
214 byteBuffer.flip();
215
216 int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8);
217
218 byteBuffer.clear();
219 byteBuffer.limit(chunkSize);
220 socketChannel.read(byteBuffer);
221 byteBuffer.flip();
222
223 binaryReader.deserialize(byteBuffer);
224 }
225
226 return binaryReader.getRoot();
227 }
228
229 /**
230 * @see AbstractConnectionHandler#closeConnection(SocketChannel)
231 */
232 public void closeConnection(SocketChannel socketChannel){
233 if(socketReadWriteMultiplexer != null) socketReadWriteMultiplexer.stopRunning();
234
235 Socket socket = socketChannel.socket();
236
237 // Close the socket
238 LoggerFactory.log("Closing connection with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION);
239
240 // Close the in- output stream of the socket to ensure that the file descriptors are closed
241 // immidiately and NOT whenever the JVM feels like it.
242 try{
243 if(!socket.isInputShutdown()) socket.shutdownInput();
244 }catch(IOException ioex){
245 // Ignore
246 }
247 try{
248 if(!socket.isOutputShutdown()) socket.shutdownOutput();
249 }catch(IOException ioex){
250 // Ignore
251 }
252
253 try{
254 if(!socket.isClosed()) socket.close();
255 }catch(IOException ioex){
256 LoggerFactory.log("Failed to close the socket with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
257 }
258 }
259 }