001 package toolbus.communication;
002
003 import java.io.IOException;
004 import java.nio.ByteBuffer;
005 import java.nio.channels.SocketChannel;
006 import jjtraveler.VisitFailure;
007 import toolbus.TBTermFactory;
008 import toolbus.logging.ILogger;
009 import toolbus.logging.IToolBusLoggerConstants;
010 import toolbus.logging.LoggerFactory;
011 import toolbus.util.NativeTypeConverter;
012 import toolbus.util.collections.RotatingQueue;
013 import aterm.ATerm;
014 import aterm.pure.binary.BinaryReader;
015 import aterm.pure.binary.BinaryWriter;
016
017 /**
018 * This class handles all communication with remote applications through sockets. It uses NIO /
019 * asynchroneous I/O. It will use 64k of staticly allocated native memory for I/O buffers.
020 *
021 * @author Arnold Lankamp
022 */
023 public class SocketIOHandler implements IIOHandler{
024 public final static byte END_OPC = 127;
025
026 private final static byte OPERATIONBYTES = 1;
027 private final static byte BLOCKLENGTHBYTES = 2;
028
029 private final static int BUFFERSIZE = 32768;
030
031 private final ByteBuffer operationReadBuffer;
032 private final ByteBuffer lengthReadBuffer;
033 private final ByteBuffer readBuffer;
034 private final ByteBuffer lengthWriteBuffer;
035 private final ByteBuffer writeBuffer;
036
037 private final SocketChannel socketChannel;
038 private final AbstractConnectionHandler connectionHandler;
039 private final IDataHandler dataHandler;
040
041 private boolean expectingDisconnect = false;
042
043 private BinaryReader binaryReader = null;
044 private byte operation = -1;
045 private int blockLength = -1;
046
047 private final RotatingQueue<OperationTermPair> writeQueue;
048
049 private BinaryWriter binaryWriter = null;
050 private boolean doneWithBlock = false;
051
052 /**
053 * Constructor.
054 *
055 * @param dataHandler
056 * The data handler that is associated with this socket I/O handler.
057 * @param connectionHandler
058 * The connection handler that created this socket I/O handler.
059 * @param socketChannel
060 * The socket channel that will be used by this socket I/O handler.
061 */
062 public SocketIOHandler(IDataHandler dataHandler, AbstractConnectionHandler connectionHandler, SocketChannel socketChannel){
063 super();
064
065 this.dataHandler = dataHandler;
066 this.connectionHandler = connectionHandler;
067 this.socketChannel = socketChannel;
068
069 writeQueue = new RotatingQueue<OperationTermPair>();
070
071 operationReadBuffer = ByteBuffer.allocateDirect(OPERATIONBYTES);
072 lengthReadBuffer = ByteBuffer.allocateDirect(BLOCKLENGTHBYTES);
073 readBuffer = ByteBuffer.allocateDirect(BUFFERSIZE);
074 lengthWriteBuffer = ByteBuffer.allocateDirect(BLOCKLENGTHBYTES);
075 writeBuffer = ByteBuffer.allocateDirect(BUFFERSIZE);
076 }
077
078 /**
079 * Adds the term to the list of terms that needs to be send. The terms will be send in the same
080 * order as they arrived at this method.
081 *
082 * @see IIOHandler#send(byte, ATerm)
083 */
084 public void send(byte op, ATerm aTerm){
085 OperationTermPair otp = new OperationTermPair();
086 otp.operation = op;
087 otp.term = aTerm;
088 synchronized(writeQueue){
089 writeQueue.put(otp);
090 }
091 IWriteMultiplexer writeMultiplexer = connectionHandler.getWriteMultiplexer();
092 writeMultiplexer.registerForWrite(socketChannel, this);
093 }
094
095 /**
096 * @see IIOHandler#receive(byte, ATerm)
097 */
098 public void receive(byte op, ATerm aTerm){
099 dataHandler.receive(op, aTerm);
100 }
101
102 /**
103 * This method triggers the reading of data from the socket. Partial reads are possible, it will
104 * continue where it left off. IMPORTANT: Do not call this method concurrently; it will lead to
105 * undefined behaviour.
106 */
107 protected void read(){
108 boolean connected = true;
109
110 if(operation == -1) connected = receiveOperation();
111 if(connected && operation != -1 && blockLength == -1) connected = receiveBlockLength();
112 if(connected && blockLength != -1) connected = receiveDataBlock();
113
114 if(!connected){
115 handleDisconnect();
116 }else if(binaryReader.isDone()){
117 ATerm term = binaryReader.getRoot();
118 if(operation == END_OPC){
119 shutDown();
120 connectionHandler.closeConnection(socketChannel);
121 }else{
122 receive(operation, term);
123 }
124
125 binaryReader = null;
126 operation = -1;
127 }
128 }
129
130 /**
131 * Reads the operation byte from the socket channel.
132 *
133 * @return False if we received a disconnect, true otherwise.
134 */
135 private boolean receiveOperation(){
136 boolean connected = readFromChannel(operationReadBuffer);
137
138 if(!operationReadBuffer.hasRemaining() && connected){
139 operationReadBuffer.flip();
140 operation = operationReadBuffer.get();
141 operationReadBuffer.clear();
142 }
143
144 // Initialize a new reader for this operation
145 binaryReader = new BinaryReader(TBTermFactory.getInstance());
146
147 return connected;
148 }
149
150 /**
151 * Reads the data block length from the socket channel (2 bytes, little endian encoded).
152 *
153 * @return False if we received a disconnect, true otherwise.
154 */
155 private boolean receiveBlockLength(){
156 boolean connected = readFromChannel(lengthReadBuffer);
157
158 if(!lengthReadBuffer.hasRemaining() && connected){
159 lengthReadBuffer.flip();
160 byte[] blockLengthBytes = new byte[lengthReadBuffer.limit()];
161 lengthReadBuffer.get(blockLengthBytes);
162
163 blockLength = NativeTypeConverter.makeUnsignedShort(blockLengthBytes);
164
165 lengthReadBuffer.clear();
166
167 readBuffer.clear();
168 readBuffer.limit(blockLength);
169 }
170
171 return connected;
172 }
173
174 /**
175 * Reads the data from the current data block from the socket channel.
176 *
177 * @return False if we received a disconnect, true otherwise.
178 */
179 private boolean receiveDataBlock(){
180 boolean connected = readFromChannel(readBuffer);
181
182 if(!readBuffer.hasRemaining()){
183 readBuffer.flip();
184
185 binaryReader.deserialize(readBuffer);
186
187 blockLength = -1;
188 }
189
190 return connected;
191 }
192
193 /**
194 * Reads bytes from the socket channel and inserts them into the given byte buffer.
195 *
196 * @param buffer
197 * The buffer to insert the received data into.
198 * @return False if we received a disconnect, true otherwise.
199 */
200 private boolean readFromChannel(ByteBuffer buffer){
201 boolean connected = true;
202 try{
203 int bytesRead = socketChannel.read(buffer);
204 if(bytesRead == -1) connected = false;
205 }catch(IOException ioex){
206 LoggerFactory.log("An IOException occured while reading from a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
207 connectionHandler.closeDueToException(socketChannel, this);
208 throw new RuntimeException(ioex);
209 }
210
211 return connected;
212 }
213
214 /**
215 * This method triggers the writing of data from the socket. Partial writes are possible, it
216 * will continue where it left off. IMPORTANT: Do not call this method concurrently; it will
217 * lead to undefined behaviour.
218 */
219 protected void write(){
220 if(binaryWriter == null){
221 OperationTermPair otp;
222 synchronized(writeQueue){
223 if(writeQueue.isEmpty()) return; // Nothing to write.
224 otp = writeQueue.get();
225 }
226 sendOperation(otp.operation);
227
228 binaryWriter = new BinaryWriter(otp.term);
229 doneWithBlock = true;
230 }
231
232 if(doneWithBlock){
233 try{
234 writeBuffer.clear();
235 binaryWriter.serialize(writeBuffer);
236 }catch(VisitFailure vf){
237 // Bogus catch block, this never happens.
238 }
239 sendBlockLength(writeBuffer.limit());
240 }
241
242 doneWithBlock = sendDataBlock();
243
244 if(doneWithBlock && binaryWriter.isFinished()) binaryWriter = null;
245 }
246
247 /**
248 * Writes the operation byte to the stream.
249 *
250 * @param op
251 * The operation byte that indicates what the upcoming data package represents.
252 */
253 private void sendOperation(byte op){
254 writeBuffer.clear();
255 writeBuffer.put(op);
256 writeBuffer.flip();
257
258 forcedWrite(writeBuffer);
259 }
260
261 /**
262 * Writes the block length to the socket channel.
263 *
264 * @param length
265 * The length of the block of data that will be written.
266 */
267 private void sendBlockLength(int length){
268 lengthWriteBuffer.clear();
269 lengthWriteBuffer.put(NativeTypeConverter.makeBytesFromUnsignedShort(length));
270 lengthWriteBuffer.flip();
271
272 forcedWrite(lengthWriteBuffer);
273 }
274
275 /**
276 * Writes a block of data to the socket channel.
277 *
278 * @return True if the entire block of data has been written to the socket channel, false
279 * otherwise.
280 */
281 private boolean sendDataBlock(){
282 try{
283 socketChannel.write(writeBuffer);
284 }catch(IOException ioex){
285 LoggerFactory.log("An error occured while writing the end of stream byte to a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
286 connectionHandler.closeDueToException(socketChannel, this);
287 throw new RuntimeException(ioex);
288 }
289
290 return !writeBuffer.hasRemaining();
291 }
292
293 /**
294 * Forces the writing of the content of the given buffer to the socket channel. Use this method
295 * for writing small amounts of data (no more then a few bytes) that need to be send in one
296 * piece.
297 *
298 * @param buffer
299 * The buffer to write to.
300 */
301 private void forcedWrite(ByteBuffer buffer){
302 while(buffer.hasRemaining()){
303 try{
304 socketChannel.write(buffer);
305 }catch(IOException ioex){
306 LoggerFactory.log("An error occured while writing to a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
307 connectionHandler.closeDueToException(socketChannel, this);
308 throw new RuntimeException(ioex);
309 }
310
311 // Yield when needed.
312 if(buffer.hasRemaining()) Thread.yield();
313 }
314 }
315
316 /**
317 * Checks if there is data left that needs to be written.
318 *
319 * @return True if there is data left that needs to be written; false otherwise.
320 */
321 public boolean hasMoreToWrite(){
322 boolean hasMoreToWrite = (binaryWriter != null);
323 if(!hasMoreToWrite){
324 synchronized(writeQueue){
325 hasMoreToWrite = !writeQueue.isEmpty();
326 }
327 }
328
329 return hasMoreToWrite;
330 }
331
332 /**
333 * Terminates the connection with the in this socket I/O handler contained socket channel in an
334 * orderly manner. All data that was queued for writing will be written to the socket before the
335 * end message will be send.
336 *
337 * @see IIOHandler#terminate()
338 */
339 public void terminate(){
340 OperationTermPair otp = new OperationTermPair();
341 otp.operation = END_OPC;
342 otp.term = TBTermFactory.getInstance().makeList();
343
344 synchronized(writeQueue){
345 writeQueue.put(otp);
346 }
347
348 IWriteMultiplexer writeMultiplexer = connectionHandler.getWriteMultiplexer();
349 writeMultiplexer.registerForWrite(socketChannel, this);
350
351 expectingDisconnect = true;
352 }
353
354 /**
355 * @see IIOHandler#shutDown()
356 */
357 public void shutDown(){
358 dataHandler.shutDown();
359 }
360
361 /**
362 * @see IIOHandler#exceptionOccured()
363 */
364 public void exceptionOccured(){
365 dataHandler.exceptionOccured();
366 }
367
368 /**
369 * Handles the received disconnect. The proper close method on the connection handler will be
370 * called depending on whether or not we anticipated the disconnect.
371 */
372 private void handleDisconnect(){
373 if(expectingDisconnect) connectionHandler.closeConnection(socketChannel);
374 else connectionHandler.closeDueToDisconnect(socketChannel, this);
375 }
376
377 /**
378 * A structure that links a term with an operation.
379 *
380 * @author Arnold Lankamp
381 */
382 protected static class OperationTermPair{
383 public byte operation = -1;
384 public ATerm term = null;
385 }
386 }