001 package toolbus; 002 003 import java.io.IOException; 004 import java.net.InetSocketAddress; 005 import java.net.ServerSocket; 006 import java.net.Socket; 007 import java.net.SocketException; 008 import java.nio.ByteBuffer; 009 import java.nio.channels.ServerSocketChannel; 010 import java.nio.channels.SocketChannel; 011 import jjtraveler.VisitFailure; 012 import toolbus.communication.AbstractConnectionHandler; 013 import toolbus.communication.SocketIOHandler; 014 import toolbus.communication.SocketReadMultiplexer; 015 import toolbus.communication.SocketWriteMultiplexer; 016 import toolbus.exceptions.ToolBusException; 017 import toolbus.logging.ILogger; 018 import toolbus.logging.IToolBusLoggerConstants; 019 import toolbus.logging.LoggerFactory; 020 import toolbus.tool.ToolDefinition; 021 import toolbus.tool.ToolInstance; 022 import aterm.ATerm; 023 import aterm.ATermAppl; 024 import aterm.pure.PureFactory; 025 import aterm.pure.binary.BinaryReader; 026 import aterm.pure.binary.BinaryWriter; 027 028 /** 029 * This class handles the establishing and closing of all TCP/IP connections. 030 * 031 * @author Arnold Lankamp 032 */ 033 public class SocketConnectionHandler extends AbstractConnectionHandler implements Runnable{ 034 private final static int HANDSHAKEBUFFERSIZE = 4096; 035 036 private final ToolBus toolbus; 037 038 private final SocketReadMultiplexer readMultiplexer; 039 private final SocketWriteMultiplexer writeMultiplexer; 040 041 private final ServerSocketChannel serverSocketChannel; 042 private final ByteBuffer handShakeBuffer; 043 private volatile boolean running = false; 044 045 /** 046 * Constructor. 047 * 048 * @param toolbus 049 * A reference to the main class of the Toolbus. 050 */ 051 public SocketConnectionHandler(ToolBus toolbus){ 052 super(); 053 054 this.toolbus = toolbus; 055 056 try{ 057 serverSocketChannel = ServerSocketChannel.open(); 058 }catch(IOException ioex){ 059 LoggerFactory.log("Unable open a server socket.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION); 060 throw new RuntimeException(ioex); 061 } 062 063 readMultiplexer = new SocketReadMultiplexer(this); 064 writeMultiplexer = new SocketWriteMultiplexer(this); 065 066 handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE); 067 068 running = true; // This is placed here, instead of at the start of the run loop because of a potential concurrency problem. 069 } 070 071 /** 072 * Initializes the connection handler. Initialization MUST occur before executing the run method. 073 * 074 * @throws IOException 075 */ 076 public void initialize() throws IOException{ 077 ServerSocket serverSocket = serverSocketChannel.socket(); 078 serverSocket.bind(null); 079 serverSocketChannel.configureBlocking(true); 080 081 Thread readThread = new Thread(readMultiplexer); 082 readThread.setName("Read multiplexer"); 083 readThread.start(); 084 085 Thread writeThread = new Thread(writeMultiplexer); 086 writeThread.setName("Write multiplexer"); 087 writeThread.start(); 088 } 089 090 /** 091 * Initializes the connection handler on a user specified port. Initialization MUST occur before 092 * executing the run method. 093 * 094 * @throws IOException 095 */ 096 public void initialize(int port) throws IOException{ 097 ServerSocket serverSocket = serverSocketChannel.socket(); 098 serverSocket.bind(new InetSocketAddress(port)); 099 serverSocketChannel.configureBlocking(true); 100 101 Thread readThread = new Thread(readMultiplexer); 102 readThread.setName("Read multiplexer"); 103 readThread.start(); 104 105 Thread writeThread = new Thread(writeMultiplexer); 106 writeThread.setName("Write multiplexer"); 107 writeThread.start(); 108 } 109 110 /** 111 * Returns the port number the ToolBus is currently running on. 112 * 113 * @return The port number the ToolBus is currently running on. 114 */ 115 public int getPort(){ 116 return serverSocketChannel.socket().getLocalPort(); 117 } 118 119 /** 120 * @see AbstractConnectionHandler#getReadMultiplexer() 121 */ 122 public SocketReadMultiplexer getReadMultiplexer(){ 123 return readMultiplexer; 124 } 125 126 /** 127 * @see AbstractConnectionHandler#getWriteMultiplexer() 128 */ 129 public SocketWriteMultiplexer getWriteMultiplexer(){ 130 return writeMultiplexer; 131 } 132 133 /** 134 * Checks if this handler is running or not. 135 * 136 * @return True if this handler is running, false otherwise. 137 */ 138 public boolean isRunning(){ 139 return running; 140 } 141 142 /** 143 * Stops the execution of this handler. The running state will be set to false and the server 144 * socket will be closed. 145 */ 146 public void stopRunning(){ 147 running = false; 148 149 try{ 150 ServerSocket serverSocket = serverSocketChannel.socket(); 151 serverSocket.close(); 152 }catch(IOException ioex){ 153 LoggerFactory.log("An error occured while shutting down the connection handler.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 154 } 155 156 readMultiplexer.stopRunning(); 157 writeMultiplexer.stopRunning(); 158 } 159 160 /** 161 * Main loop of this handler. It tries to accept any incoming connection attempt. 162 */ 163 public void run(){ 164 while(running){ 165 try{ 166 acceptConnection(); 167 }catch(IOException ioex){ 168 if(running) LoggerFactory.log("An error occured while accepting a connection.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 169 running = false; // temporary fix to avoid a loop 170 } 171 } 172 } 173 174 /** 175 * Accepts a connection with a remote application. 176 * 177 * @throws IOException 178 * Thrown when something goes wrong while accepting the connection, registering it 179 * or instantiating the associated I/O handlers. 180 */ 181 private void acceptConnection() throws IOException{ 182 SocketChannel socketChannel = serverSocketChannel.accept(); 183 184 Socket socket = socketChannel.socket(); 185 // Disable Nagle's algorithm, we don't want the random 500ms delays. 186 socket.setTcpNoDelay(true); 187 try{ 188 // Set the traffic class to high throughput and low delay. 189 socket.setTrafficClass(0x18); 190 }catch(SocketException sex){ 191 // This catch block is only here because some operating systems have a problem with setting priorities. 192 } 193 194 ToolInstance toolInstance = null; 195 try{ 196 toolInstance = shakeHands(socketChannel); 197 }catch(IOException ioex){ 198 LoggerFactory.log("Unable to shake hands with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION); 199 closeConnection(socketChannel); 200 return; 201 }catch(ToolBusException tbex){ 202 LoggerFactory.log("Unable to shake hands with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION); 203 closeConnection(socketChannel); 204 return; 205 }catch(RuntimeException rex){ 206 LoggerFactory.log("An runtime exception occured while executing a handshake with a tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort(), rex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 207 closeConnection(socketChannel); 208 return; 209 } 210 211 if(toolInstance == null){ 212 LoggerFactory.log("Tool at: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ", didn't supply the the expected interface.", ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION); 213 closeConnection(socketChannel); 214 return; 215 } 216 217 socketChannel.configureBlocking(false); 218 219 // Set up handlers 220 SocketIOHandler ioHandler = new SocketIOHandler(toolInstance, this, socketChannel); 221 toolInstance.setIOHandler(ioHandler); 222 223 readMultiplexer.registerForRead(socketChannel, ioHandler); 224 } 225 226 private ToolInstance shakeHands(SocketChannel socketChannel) throws IOException, ToolBusException{ 227 socketChannel.configureBlocking(true); 228 229 // Receive tool id. 230 ATermAppl toolKey = (ATermAppl) readTermFromChannel(toolbus.getTBTermFactory(), socketChannel, handShakeBuffer); 231 String toolName = toolKey.getAFun().getName(); 232 233 ToolDefinition toolDef = toolbus.getToolDefinition(toolName); 234 if(toolDef == null){ 235 String error = "No toolDef found for tool with name: " + toolName; 236 LoggerFactory.log(error, ILogger.WARNING, IToolBusLoggerConstants.COMMUNICATION); 237 throw new RuntimeException(error); 238 } 239 240 ToolInstanceManager toolInstanceManager = toolbus.getToolInstanceManager(); 241 242 ToolInstance toolInstance = toolInstanceManager.getPendingTool(toolKey); 243 244 // If we didn't request the tool with the given id to execute, it's connecting on it's own 245 // initiative. 246 if(toolInstance == null){ 247 toolInstance = new ToolInstance(toolDef, toolbus); 248 toolInstanceManager.addDynamiclyConnectedTool(toolInstance); 249 250 LoggerFactory.log("Tool: " + toolInstance.getToolKey() + ", connected at its own initiative.", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION); 251 } 252 253 // Send the signature. 254 writeTermToChannel(toolDef.getSignature(), socketChannel, handShakeBuffer); 255 256 // Receive signature confirmation. 257 handShakeBuffer.clear(); 258 handShakeBuffer.limit(1); 259 socketChannel.read(handShakeBuffer); 260 handShakeBuffer.flip(); 261 byte signatureConfirmation = handShakeBuffer.get(); 262 if(signatureConfirmation != 1){// Not OK 263 toolInstance.kill(); 264 toolInstance = null; 265 }else{ 266 // Send the (permanent) tool key back to the tool. 267 writeTermToChannel(toolInstance.getToolKey(), socketChannel, handShakeBuffer); 268 } 269 270 return toolInstance; 271 } 272 273 /** 274 * Transmits the term to the given socket channel, using the given buffer. 275 * 276 * @param aTerm 277 * The term to write. 278 * @param socketChannel 279 * The channel to write to. 280 * @param byteBuffer 281 * The buffer to use. 282 * @throws IOException 283 * Thrown when something goes wrong while writing to the channel. 284 */ 285 private void writeTermToChannel(ATerm aTerm, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{ 286 BinaryWriter binaryWriter = new BinaryWriter(aTerm); 287 while(!binaryWriter.isFinished()){ 288 byteBuffer.clear(); 289 byteBuffer.position(2); 290 try{ 291 binaryWriter.serialize(byteBuffer); 292 }catch(VisitFailure vf){ 293 // Bogus catch block, this can't happen. 294 } 295 // Insert chunk size data. 296 int chunkSize = byteBuffer.limit() - 2; 297 byteBuffer.put(0, (byte) (chunkSize & 0x000000FF)); 298 byteBuffer.put(1, (byte) ((chunkSize & 0x0000FF00) >> 8)); 299 300 // Write chunk 301 socketChannel.write(byteBuffer); 302 } 303 } 304 305 /** 306 * Reads a term from the given channel, using the given buffer. 307 * 308 * @param factory 309 * The factory to use for parsing the term. 310 * @param socketChannel 311 * The channel to read from. 312 * @param byteBuffer 313 * The buffer to use. 314 * @return The term that was read. 315 * @throws IOException 316 * Thrown when something goes wrong while reading the term from the channel. 317 */ 318 private ATerm readTermFromChannel(PureFactory factory, SocketChannel socketChannel, ByteBuffer byteBuffer) throws IOException{ 319 BinaryReader binaryReader = new BinaryReader(factory); 320 while(!binaryReader.isDone()){ 321 byteBuffer.clear(); 322 byteBuffer.limit(2); 323 socketChannel.read(byteBuffer); 324 byteBuffer.flip(); 325 326 int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8); 327 byteBuffer.clear(); 328 byteBuffer.limit(chunkSize); 329 socketChannel.read(byteBuffer); 330 byteBuffer.flip(); 331 332 binaryReader.deserialize(byteBuffer); 333 } 334 335 return binaryReader.getRoot(); 336 } 337 338 /** 339 * @see AbstractConnectionHandler#closeConnection(SocketChannel) 340 */ 341 public void closeConnection(SocketChannel socketChannel){ 342 readMultiplexer.deregisterForRead(socketChannel); 343 writeMultiplexer.deregisterForWrite(socketChannel); 344 345 Socket socket = socketChannel.socket(); 346 LoggerFactory.log("Closing connection with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ILogger.INFO, IToolBusLoggerConstants.COMMUNICATION); 347 348 // Close the in- output stream of the socket to ensure that the file descriptors are closed 349 // immidiately and NOT whenever the JVM feels like it. 350 try{ 351 if(!socket.isInputShutdown()) socket.shutdownInput(); 352 }catch(IOException ioex){ 353 // Ignore 354 } 355 try{ 356 if(!socket.isOutputShutdown()) socket.shutdownOutput(); 357 }catch(IOException ioex){ 358 // Ignore 359 } 360 361 try{ 362 if(!socket.isClosed()) socket.close(); 363 }catch(IOException ioex){ 364 LoggerFactory.log("Failed to close the socket with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION); 365 } 366 367 try{ 368 if(socketChannel.isOpen()) socketChannel.close(); 369 }catch(IOException ioex){ 370 LoggerFactory.log("Failed to close the socket channel with: " + socket.getInetAddress().getHostName() + ":" + socket.getPort() + ".", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION); 371 } 372 } 373 }