001 package toolbus;
002
003 import java.io.IOException;
004 import java.net.InetSocketAddress;
005 import java.net.ServerSocket;
006 import java.net.Socket;
007 import java.net.SocketException;
008 import java.nio.ByteBuffer;
009 import java.nio.channels.ServerSocketChannel;
010 import java.nio.channels.SocketChannel;
011 import jjtraveler.VisitFailure;
012 import toolbus.communication.AbstractConnectionHandler;
013 import toolbus.communication.SocketIOHandler;
014 import toolbus.communication.SocketReadMultiplexer;
015 import toolbus.communication.SocketWriteMultiplexer;
016 import toolbus.exceptions.ToolBusException;
017 import toolbus.logging.ILogger;
018 import toolbus.logging.IToolBusLoggerConstants;
019 import toolbus.logging.LoggerFactory;
020 import toolbus.tool.ToolDefinition;
021 import toolbus.tool.ToolInstance;
022 import aterm.ATerm;
023 import aterm.ATermAppl;
024 import aterm.pure.PureFactory;
025 import aterm.pure.binary.BinaryReader;
026 import aterm.pure.binary.BinaryWriter;
027
028 /**
029 * This class handles the establishing and closing of all TCP/IP connections.
030 *
031 * @author Arnold Lankamp
032 */
033 public class SocketConnectionHandler extends AbstractConnectionHandler implements Runnable{
034 private final static int HANDSHAKEBUFFERSIZE = 4096;
035
036 private final ToolBus toolbus;
037
038 private final SocketReadMultiplexer readMultiplexer;
039 private final SocketWriteMultiplexer writeMultiplexer;
040
041 private final ServerSocketChannel serverSocketChannel;
042 private final ByteBuffer handShakeBuffer;
043 private volatile boolean running = false;
044
045 /**
046 * Constructor.
047 *
048 * @param toolbus
049 * A reference to the main class of the Toolbus.
050 */
051 public SocketConnectionHandler(ToolBus toolbus){
052 super();
053
054 this.toolbus = toolbus;
055
056 try{
057 serverSocketChannel = ServerSocketChannel.open();
058 }catch(IOException ioex){
059 LoggerFactory.log("Unable open a server socket.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
060 throw new RuntimeException(ioex);
061 }
062
063 readMultiplexer = new SocketReadMultiplexer(this);
064 writeMultiplexer = new SocketWriteMultiplexer(this);
065
066 handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE);
067
068 running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem.
069 }
070
071 /**
072 * Initializes the connection handler. Initialization MUST occur before executing the run method.
073 *
074 * @throws IOException
075 */
076 public void initialize() throws IOException{
077 ServerSocket serverSocket = serverSocketChannel.socket();
078 serverSocket.bind(null);
079 serverSocketChannel.configureBlocking(true);
080
081 Thread readThread = new Thread(readMultiplexer);
082 readThread.setName("Read multiplexer");
083 readThread.start();
084
085 Thread writeThread = new Thread(writeMultiplexer);
086 writeThread.setName("Write multiplexer");
087 writeThread.start();
088 }
089
090 /**
091 * Initializes the connection handler on a user specified port. Initialization MUST occur before
092 * executing the run method.
093 *
094 * @throws IOException
095 */
096 public void initialize(int port) throws IOException{
097 ServerSocket serverSocket = serverSocketChannel.socket();
098 serverSocket.bind(new InetSocketAddress(port));
099 serverSocketChannel.configureBlocking(true);
100
101 Thread readThread = new Thread(readMultiplexer);
102 readThread.setName("Read multiplexer");
103 readThread.start();
104
105 Thread writeThread = new Thread(writeMultiplexer);
106 writeThread.setName("Write multiplexer");
107 writeThread.start();
108 }
109
110 /**
111 * Returns the port number the ToolBus is currently running on.
112 *
113 * @return The port number the ToolBus is currently running on.
114 */
115 public int getPort(){
116 return serverSocketChannel.socket().getLocalPort();
117 }
118
119 /**
120 * @see AbstractConnectionHandler#getReadMultiplexer()
121 */
122 public SocketReadMultiplexer getReadMultiplexer(){
123 return readMultiplexer;
124 }
125
126 /**
127 * @see AbstractConnectionHandler#getWriteMultiplexer()
128 */
129 public SocketWriteMultiplexer getWriteMultiplexer(){
130 return writeMultiplexer;
131 }
132
133 /**
134 * Checks if this handler is running or not.
135 *
136 * @return True if this handler is running, false otherwise.
137 */
138 public boolean isRunning(){
139 return running;
140 }
141
142 /**
143 * Stops the execution of this handler. The running state will be set to false and the server
144 * socket will be closed.
145 */
146 public void stopRunning(){
147 running = false;
148
149 try{
150 ServerSocket serverSocket = serverSocketChannel.socket();
151 serverSocket.close();
152 }catch(IOException ioex){
153 LoggerFactory.log("An error occured while shutting down the connection handler.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
154 }
155
156 readMultiplexer.stopRunning();
157 writeMultiplexer.stopRunning();
158 }
159
160 /**
161 * Main loop of this handler. It tries to accept any incoming connection attempt.
162 */
163 public void run(){
164 while(running){
165 try{
166 acceptConnection();
167 }catch(IOException ioex){
168 if(running) LoggerFactory.log("An error occured while accepting a connection.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
169 running = false; // temporary fix to avoid a loop
170 }
171 }
172 }
173
174 /**
175 * Accepts a connection with a remote application.
176 *
177 * @throws IOException
178 * Thrown when something goes wrong while accepting the connection, registering it
179 * or instantiating the associated I/O handlers.
180 */
181 private void acceptConnection() throws IOException{
182 SocketChannel socketChannel = serverSocketChannel.accept();
183
184 Socket socket = socketChannel.socket();
185 // Disable Nagle's algorithm, we don't want the random 500ms delays.
186 socket.setTcpNoDelay(true);
187 try{
188 // Set the traffic class to high throughput and low delay.
189 socket.setTrafficClass(0x18);
190 }catch(SocketException sex){
191 // This catch block is only here because some operating systems have a problem with setting priorities.
192 }
193
194 ToolInstance toolInstance = null;
195 try{
196 toolInstance = shakeHands(socketChannel);
197 }catch(IOException ioex){
198 LoggerFactory.log("Unable to shake hands with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
199 closeConnection(socketChannel);
200 return;
201 }catch(ToolBusException tbex){
202 LoggerFactory.log("Unable to shake hands with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
203 closeConnection(socketChannel);
204 return;
205 }catch(RuntimeException rex){
206 LoggerFactory.log("An runtime exception occured while executing a handshake with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
207 closeConnection(socketChannel);
208 return;
209 }
210
211 if(toolInstance == null){
212 LoggerFactory.log("Tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ", didn't supply the the expected interface.", ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
213 closeConnection(socketChannel);
214 return;
215 }
216
217 socketChannel.configureBlocking(false);
218
219 // Set up handlers
220 SocketIOHandler ioHandler = new SocketIOHandler(toolInstance, this, socketChannel);
221 toolInstance.setIOHandler(ioHandler);
222
223 readMultiplexer.registerForRead(socketChannel, ioHandler);
224 }
225
226 private ToolInstance shakeHands(SocketChannel socketChannel) throws IOException, ToolBusException{
227 socketChannel.configureBlocking(true);
228
229 // Receive tool id.
230 ATermAppl toolKey = (ATermAppl) readTermFromChannel(toolbus.getTBTermFactory(), socketChannel, handShakeBuffer);
231 String toolName = toolKey.getAFun().getName();
232
233 ToolDefinition toolDef = toolbus.getToolDefinition(toolName);
234 if(toolDef == null){
235 String error = "No toolDef found for tool with name: " + toolName;
236 LoggerFactory.log(error, ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION);
237 throw new RuntimeException(error);
238 }
239
240 ToolInstanceManager toolInstanceManager = toolbus.getToolInstanceManager();
241
242 ToolInstance toolInstance = toolInstanceManager.getPendingTool(toolKey);
243
244 // If we didn't request the tool with the given id to execute, it's connecting on it's own
245 // initiative.
246 if(toolInstance == null){
247 toolInstance = new ToolInstance(toolDef, toolbus);
248 toolInstanceManager.addDynamiclyConnectedTool(toolInstance);
249
250 LoggerFactory.log("Tool: " + toolInstance.getToolKey() + ", connected at its own initiative.", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION);
251 }
252
253 // Send the signature.
254 writeTermToChannel(toolDef.getSignature(), socketChannel, handShakeBuffer);
255
256 // Receive signature confirmation.
257 handShakeBuffer.clear();
258 handShakeBuffer.limit(1);
259 socketChannel.read(handShakeBuffer);
260 handShakeBuffer.flip();
261 byte signatureConfirmation = handShakeBuffer.get();
262 if(signatureConfirmation != 1){// Not OK
263 toolInstance.kill();
264 toolInstance = null;
265 }else{
266 // Send the (permanent) tool key back to the tool.
267 writeTermToChannel(toolInstance.getToolKey(), socketChannel, handShakeBuffer);
268 }
269
270 return toolInstance;
271 }
272
273 /**
274 * Transmits the term to the given socket channel, using the given buffer.
275 *
276 * @param aTerm
277 * The term to write.
278 * @param socketChannel
279 * The channel to write to.
280 * @param byteBuffer
281 * The buffer to use.
282 * @throws IOException
283 * Thrown when something goes wrong while writing to the channel.
284 */
285 private void writeTermToChannel(ATerm aTerm, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
286 BinaryWriter binaryWriter = new BinaryWriter(aTerm);
287 while(!binaryWriter.isFinished()){
288 byteBuffer.clear();
289 byteBuffer.position(2);
290 try{
291 binaryWriter.serialize(byteBuffer);
292 }catch(VisitFailure vf){
293 // Bogus catch block, this can't happen.
294 }
295 // Insert chunk size data.
296 int chunkSize = byteBuffer.limit() - 2;
297 byteBuffer.put(0, (byte) (chunkSize & 0x000000FF));
298 byteBuffer.put(1, (byte) ((chunkSize & 0x0000FF00) >> 8));
299
300 // Write chunk
301 socketChannel.write(byteBuffer);
302 }
303 }
304
305 /**
306 * Reads a term from the given channel, using the given buffer.
307 *
308 * @param factory
309 * The factory to use for parsing the term.
310 * @param socketChannel
311 * The channel to read from.
312 * @param byteBuffer
313 * The buffer to use.
314 * @return The term that was read.
315 * @throws IOException
316 * Thrown when something goes wrong while reading the term from the channel.
317 */
318 private ATerm readTermFromChannel(PureFactory factory, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{
319 BinaryReader binaryReader = new BinaryReader(factory);
320 while(!binaryReader.isDone()){
321 byteBuffer.clear();
322 byteBuffer.limit(2);
323 socketChannel.read(byteBuffer);
324 byteBuffer.flip();
325
326 int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8);
327 byteBuffer.clear();
328 byteBuffer.limit(chunkSize);
329 socketChannel.read(byteBuffer);
330 byteBuffer.flip();
331
332 binaryReader.deserialize(byteBuffer);
333 }
334
335 return binaryReader.getRoot();
336 }
337
338 /**
339 * @see AbstractConnectionHandler#closeConnection(SocketChannel)
340 */
341 public void closeConnection(SocketChannel socketChannel){
342 readMultiplexer.deregisterForRead(socketChannel);
343 writeMultiplexer.deregisterForWrite(socketChannel);
344
345 Socket socket = socketChannel.socket();
346 LoggerFactory.log("Closing connection with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION);
347
348 // Close the in- output stream of the socket to ensure that the file descriptors are closed
349 // immidiately and NOT whenever the JVM feels like it.
350 try{
351 if(!socket.isInputShutdown()) socket.shutdownInput();
352 }catch(IOException ioex){
353 // Ignore
354 }
355 try{
356 if(!socket.isOutputShutdown()) socket.shutdownOutput();
357 }catch(IOException ioex){
358 // Ignore
359 }
360
361 try{
362 if(!socket.isClosed()) socket.close();
363 }catch(IOException ioex){
364 LoggerFactory.log("Failed to close the socket with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
365 }
366
367 try{
368 if(socketChannel.isOpen()) socketChannel.close();
369 }catch(IOException ioex){
370 LoggerFactory.log("Failed to close the socket channel with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
371 }
372 }
373 }