001 package toolbus.communication; 002 003 import java.io.IOException; 004 import java.nio.ByteBuffer; 005 import java.nio.channels.SocketChannel; 006 import jjtraveler.VisitFailure; 007 import toolbus.TBTermFactory; 008 import toolbus.logging.ILogger; 009 import toolbus.logging.IToolBusLoggerConstants; 010 import toolbus.logging.LoggerFactory; 011 import toolbus.util.NativeTypeConverter; 012 import toolbus.util.collections.RotatingQueue; 013 import aterm.ATerm; 014 import aterm.pure.binary.BinaryReader; 015 import aterm.pure.binary.BinaryWriter; 016 017 /** 018 * This class handles all communication with remote applications through sockets. It uses NIO / 019 * asynchroneous I/O. It will use 64k of staticly allocated native memory for I/O buffers. 020 * 021 * @author Arnold Lankamp 022 */ 023 public class SocketIOHandler implements IIOHandler{ 024 public final static byte END_OPC = 127; 025 026 private final static byte OPERATIONBYTES = 1; 027 private final static byte BLOCKLENGTHBYTES = 2; 028 029 private final static int BUFFERSIZE = 32768; 030 031 private final ByteBuffer operationReadBuffer; 032 private final ByteBuffer lengthReadBuffer; 033 private final ByteBuffer readBuffer; 034 private final ByteBuffer lengthWriteBuffer; 035 private final ByteBuffer writeBuffer; 036 037 private final SocketChannel socketChannel; 038 private final AbstractConnectionHandler connectionHandler; 039 private final IDataHandler dataHandler; 040 041 private boolean expectingDisconnect = false; 042 043 private BinaryReader binaryReader = null; 044 private byte operation = -1; 045 private int blockLength = -1; 046 047 private final RotatingQueue<OperationTermPair> writeQueue; 048 049 private BinaryWriter binaryWriter = null; 050 private boolean doneWithBlock = false; 051 052 /** 053 * Constructor. 054 * 055 * @param dataHandler 056 * The data handler that is associated with this socket I/O handler. 057 * @param connectionHandler 058 * The connection handler that created this socket I/O handler. 059 * @param socketChannel 060 * The socket channel that will be used by this socket I/O handler. 061 */ 062 public SocketIOHandler(IDataHandler dataHandler, AbstractConnectionHandler connectionHandler, SocketChannel socketChannel){ 063 super(); 064 065 this.dataHandler = dataHandler; 066 this.connectionHandler = connectionHandler; 067 this.socketChannel = socketChannel; 068 069 writeQueue = new RotatingQueue<OperationTermPair>(); 070 071 operationReadBuffer = ByteBuffer.allocateDirect(OPERATIONBYTES); 072 lengthReadBuffer = ByteBuffer.allocateDirect(BLOCKLENGTHBYTES); 073 readBuffer = ByteBuffer.allocateDirect(BUFFERSIZE); 074 lengthWriteBuffer = ByteBuffer.allocateDirect(BLOCKLENGTHBYTES); 075 writeBuffer = ByteBuffer.allocateDirect(BUFFERSIZE); 076 } 077 078 /** 079 * Adds the term to the list of terms that needs to be send. The terms will be send in the same 080 * order as they arrived at this method. 081 * 082 * @see IIOHandler#send(byte, ATerm) 083 */ 084 public void send(byte op, ATerm aTerm){ 085 OperationTermPair otp = new OperationTermPair(); 086 otp.operation = op; 087 otp.term = aTerm; 088 synchronized(writeQueue){ 089 writeQueue.put(otp); 090 } 091 IWriteMultiplexer writeMultiplexer = connectionHandler.getWriteMultiplexer(); 092 writeMultiplexer.registerForWrite(socketChannel, this); 093 } 094 095 /** 096 * @see IIOHandler#receive(byte, ATerm) 097 */ 098 public void receive(byte op, ATerm aTerm){ 099 dataHandler.receive(op, aTerm); 100 } 101 102 /** 103 * This method triggers the reading of data from the socket. Partial reads are possible, it will 104 * continue where it left off. IMPORTANT: Do not call this method concurrently; it will lead to 105 * undefined behaviour. 106 */ 107 protected void read(){ 108 boolean connected = true; 109 110 if(operation == -1) connected = receiveOperation(); 111 if(connected && operation != -1 && blockLength == -1) connected = receiveBlockLength(); 112 if(connected && blockLength != -1) connected = receiveDataBlock(); 113 114 if(!connected){ 115 handleDisconnect(); 116 }else if(binaryReader.isDone()){ 117 ATerm term = binaryReader.getRoot(); 118 if(operation == END_OPC){ 119 shutDown(); 120 connectionHandler.closeConnection(socketChannel); 121 }else{ 122 receive(operation, term); 123 } 124 125 binaryReader = null; 126 operation = -1; 127 } 128 } 129 130 /** 131 * Reads the operation byte from the socket channel. 132 * 133 * @return False if we received a disconnect, true otherwise. 134 */ 135 private boolean receiveOperation(){ 136 boolean connected = readFromChannel(operationReadBuffer); 137 138 if(!operationReadBuffer.hasRemaining() && connected){ 139 operationReadBuffer.flip(); 140 operation = operationReadBuffer.get(); 141 operationReadBuffer.clear(); 142 } 143 144 // Initialize a new reader for this operation 145 binaryReader = new BinaryReader(TBTermFactory.getInstance()); 146 147 return connected; 148 } 149 150 /** 151 * Reads the data block length from the socket channel (2 bytes, little endian encoded). 152 * 153 * @return False if we received a disconnect, true otherwise. 154 */ 155 private boolean receiveBlockLength(){ 156 boolean connected = readFromChannel(lengthReadBuffer); 157 158 if(!lengthReadBuffer.hasRemaining() && connected){ 159 lengthReadBuffer.flip(); 160 byte[] blockLengthBytes = new byte[lengthReadBuffer.limit()]; 161 lengthReadBuffer.get(blockLengthBytes); 162 163 blockLength = NativeTypeConverter.makeUnsignedShort(blockLengthBytes); 164 165 lengthReadBuffer.clear(); 166 167 readBuffer.clear(); 168 readBuffer.limit(blockLength); 169 } 170 171 return connected; 172 } 173 174 /** 175 * Reads the data from the current data block from the socket channel. 176 * 177 * @return False if we received a disconnect, true otherwise. 178 */ 179 private boolean receiveDataBlock(){ 180 boolean connected = readFromChannel(readBuffer); 181 182 if(!readBuffer.hasRemaining()){ 183 readBuffer.flip(); 184 185 binaryReader.deserialize(readBuffer); 186 187 blockLength = -1; 188 } 189 190 return connected; 191 } 192 193 /** 194 * Reads bytes from the socket channel and inserts them into the given byte buffer. 195 * 196 * @param buffer 197 * The buffer to insert the received data into. 198 * @return False if we received a disconnect, true otherwise. 199 */ 200 private boolean readFromChannel(ByteBuffer buffer){ 201 boolean connected = true; 202 try{ 203 int bytesRead = socketChannel.read(buffer); 204 if(bytesRead == -1) connected = false; 205 }catch(IOException ioex){ 206 LoggerFactory.log("An IOException occured while reading from a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 207 connectionHandler.closeDueToException(socketChannel, this); 208 throw new RuntimeException(ioex); 209 } 210 211 return connected; 212 } 213 214 /** 215 * This method triggers the writing of data from the socket. Partial writes are possible, it 216 * will continue where it left off. IMPORTANT: Do not call this method concurrently; it will 217 * lead to undefined behaviour. 218 */ 219 protected void write(){ 220 if(binaryWriter == null){ 221 OperationTermPair otp; 222 synchronized(writeQueue){ 223 if(writeQueue.isEmpty()) return; // Nothing to write. 224 otp = writeQueue.get(); 225 } 226 sendOperation(otp.operation); 227 228 binaryWriter = new BinaryWriter(otp.term); 229 doneWithBlock = true; 230 } 231 232 if(doneWithBlock){ 233 try{ 234 writeBuffer.clear(); 235 binaryWriter.serialize(writeBuffer); 236 }catch(VisitFailure vf){ 237 // Bogus catch block, this never happens. 238 } 239 sendBlockLength(writeBuffer.limit()); 240 } 241 242 doneWithBlock = sendDataBlock(); 243 244 if(doneWithBlock && binaryWriter.isFinished()) binaryWriter = null; 245 } 246 247 /** 248 * Writes the operation byte to the stream. 249 * 250 * @param op 251 * The operation byte that indicates what the upcoming data package represents. 252 */ 253 private void sendOperation(byte op){ 254 writeBuffer.clear(); 255 writeBuffer.put(op); 256 writeBuffer.flip(); 257 258 forcedWrite(writeBuffer); 259 } 260 261 /** 262 * Writes the block length to the socket channel. 263 * 264 * @param length 265 * The length of the block of data that will be written. 266 */ 267 private void sendBlockLength(int length){ 268 lengthWriteBuffer.clear(); 269 lengthWriteBuffer.put(NativeTypeConverter.makeBytesFromUnsignedShort(length)); 270 lengthWriteBuffer.flip(); 271 272 forcedWrite(lengthWriteBuffer); 273 } 274 275 /** 276 * Writes a block of data to the socket channel. 277 * 278 * @return True if the entire block of data has been written to the socket channel, false 279 * otherwise. 280 */ 281 private boolean sendDataBlock(){ 282 try{ 283 socketChannel.write(writeBuffer); 284 }catch(IOException ioex){ 285 LoggerFactory.log("An error occured while writing the end of stream byte to a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 286 connectionHandler.closeDueToException(socketChannel, this); 287 throw new RuntimeException(ioex); 288 } 289 290 return !writeBuffer.hasRemaining(); 291 } 292 293 /** 294 * Forces the writing of the content of the given buffer to the socket channel. Use this method 295 * for writing small amounts of data (no more then a few bytes) that need to be send in one 296 * piece. 297 * 298 * @param buffer 299 * The buffer to write to. 300 */ 301 private void forcedWrite(ByteBuffer buffer){ 302 while(buffer.hasRemaining()){ 303 try{ 304 socketChannel.write(buffer); 305 }catch(IOException ioex){ 306 LoggerFactory.log("An error occured while writing to a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION); 307 connectionHandler.closeDueToException(socketChannel, this); 308 throw new RuntimeException(ioex); 309 } 310 311 // Yield when needed. 312 if(buffer.hasRemaining()) Thread.yield(); 313 } 314 } 315 316 /** 317 * Checks if there is data left that needs to be written. 318 * 319 * @return True if there is data left that needs to be written; false otherwise. 320 */ 321 public boolean hasMoreToWrite(){ 322 boolean hasMoreToWrite = (binaryWriter != null); 323 if(!hasMoreToWrite){ 324 synchronized(writeQueue){ 325 hasMoreToWrite = !writeQueue.isEmpty(); 326 } 327 } 328 329 return hasMoreToWrite; 330 } 331 332 /** 333 * Terminates the connection with the in this socket I/O handler contained socket channel in an 334 * orderly manner. All data that was queued for writing will be written to the socket before the 335 * end message will be send. 336 * 337 * @see IIOHandler#terminate() 338 */ 339 public void terminate(){ 340 OperationTermPair otp = new OperationTermPair(); 341 otp.operation = END_OPC; 342 otp.term = TBTermFactory.getInstance().makeList(); 343 344 synchronized(writeQueue){ 345 writeQueue.put(otp); 346 } 347 348 IWriteMultiplexer writeMultiplexer = connectionHandler.getWriteMultiplexer(); 349 writeMultiplexer.registerForWrite(socketChannel, this); 350 351 expectingDisconnect = true; 352 } 353 354 /** 355 * @see IIOHandler#shutDown() 356 */ 357 public void shutDown(){ 358 dataHandler.shutDown(); 359 } 360 361 /** 362 * @see IIOHandler#exceptionOccured() 363 */ 364 public void exceptionOccured(){ 365 dataHandler.exceptionOccured(); 366 } 367 368 /** 369 * Handles the received disconnect. The proper close method on the connection handler will be 370 * called depending on whether or not we anticipated the disconnect. 371 */ 372 private void handleDisconnect(){ 373 if(expectingDisconnect) connectionHandler.closeConnection(socketChannel); 374 else connectionHandler.closeDueToDisconnect(socketChannel, this); 375 } 376 377 /** 378 * A structure that links a term with an operation. 379 * 380 * @author Arnold Lankamp 381 */ 382 protected static class OperationTermPair{ 383 public byte operation = -1; 384 public ATerm term = null; 385 } 386 }