001    package toolbus.communication;
002    
003    import java.io.IOException;
004    import java.nio.ByteBuffer;
005    import java.nio.channels.SocketChannel;
006    import jjtraveler.VisitFailure;
007    import toolbus.TBTermFactory;
008    import toolbus.logging.ILogger;
009    import toolbus.logging.IToolBusLoggerConstants;
010    import toolbus.logging.LoggerFactory;
011    import toolbus.util.NativeTypeConverter;
012    import toolbus.util.collections.RotatingQueue;
013    import aterm.ATerm;
014    import aterm.pure.binary.BinaryReader;
015    import aterm.pure.binary.BinaryWriter;
016    
017    /**
018     * This class handles all communication with remote applications through sockets. It uses NIO /
019     * asynchroneous I/O. It will use 64k of staticly allocated native memory for I/O buffers.
020     * 
021     * @author Arnold Lankamp
022     */
023    public class SocketIOHandler implements IIOHandler{
024            public final static byte END_OPC = 127;
025    
026            private final static byte OPERATIONBYTES = 1;
027            private final static byte BLOCKLENGTHBYTES = 2;
028    
029            private final static int BUFFERSIZE = 32768;
030    
031            private final ByteBuffer operationReadBuffer;
032            private final ByteBuffer lengthReadBuffer;
033            private final ByteBuffer readBuffer;
034            private final ByteBuffer lengthWriteBuffer;
035            private final ByteBuffer writeBuffer;
036    
037            private final SocketChannel socketChannel;
038            private final AbstractConnectionHandler connectionHandler;
039            private final IDataHandler dataHandler;
040    
041            private boolean expectingDisconnect = false;
042    
043            private BinaryReader binaryReader = null;
044            private byte operation = -1;
045            private int blockLength = -1;
046    
047            private final RotatingQueue<OperationTermPair> writeQueue;
048    
049            private BinaryWriter binaryWriter = null;
050            private boolean doneWithBlock = false;
051    
052            /**
053             * Constructor.
054             * 
055             * @param dataHandler
056             *            The data handler that is associated with this socket I/O handler.
057             * @param connectionHandler
058             *            The connection handler that created this socket I/O handler.
059             * @param socketChannel
060             *            The socket channel that will be used by this socket I/O handler.
061             */
062            public SocketIOHandler(IDataHandler dataHandler, AbstractConnectionHandler connectionHandler, SocketChannel socketChannel){
063                    super();
064    
065                    this.dataHandler = dataHandler;
066                    this.connectionHandler = connectionHandler;
067                    this.socketChannel = socketChannel;
068    
069                    writeQueue = new RotatingQueue<OperationTermPair>();
070    
071                    operationReadBuffer = ByteBuffer.allocateDirect(OPERATIONBYTES);
072                    lengthReadBuffer = ByteBuffer.allocateDirect(BLOCKLENGTHBYTES);
073                    readBuffer = ByteBuffer.allocateDirect(BUFFERSIZE);
074                    lengthWriteBuffer = ByteBuffer.allocateDirect(BLOCKLENGTHBYTES);
075                    writeBuffer = ByteBuffer.allocateDirect(BUFFERSIZE);
076            }
077    
078            /**
079             * Adds the term to the list of terms that needs to be send. The terms will be send in the same
080             * order as they arrived at this method.
081             * 
082             * @see IIOHandler#send(byte, ATerm)
083             */
084            public void send(byte op, ATerm aTerm){
085                    OperationTermPair otp = new OperationTermPair();
086                    otp.operation = op;
087                    otp.term = aTerm;
088                    synchronized(writeQueue){
089                            writeQueue.put(otp);
090                    }
091                    IWriteMultiplexer writeMultiplexer = connectionHandler.getWriteMultiplexer();
092                    writeMultiplexer.registerForWrite(socketChannel, this);
093            }
094    
095            /**
096             * @see IIOHandler#receive(byte, ATerm)
097             */
098            public void receive(byte op, ATerm aTerm){
099                    dataHandler.receive(op, aTerm);
100            }
101    
102            /**
103             * This method triggers the reading of data from the socket. Partial reads are possible, it will
104             * continue where it left off. IMPORTANT: Do not call this method concurrently; it will lead to
105             * undefined behaviour.
106             */
107            protected void read(){
108                    boolean connected = true;
109    
110                    if(operation == -1) connected = receiveOperation();
111                    if(connected && operation != -1 && blockLength == -1) connected = receiveBlockLength();
112                    if(connected && blockLength != -1) connected = receiveDataBlock();
113                    
114                    if(!connected){
115                            handleDisconnect();
116                    }else if(binaryReader.isDone()){
117                            ATerm term = binaryReader.getRoot();
118                            if(operation == END_OPC){
119                                    shutDown();
120                                    connectionHandler.closeConnection(socketChannel);
121                            }else{
122                                    receive(operation, term);
123                            }
124    
125                            binaryReader = null;
126                            operation = -1;
127                    }
128            }
129    
130            /**
131             * Reads the operation byte from the socket channel.
132             * 
133             * @return False if we received a disconnect, true otherwise.
134             */
135            private boolean receiveOperation(){
136                    boolean connected = readFromChannel(operationReadBuffer);
137    
138                    if(!operationReadBuffer.hasRemaining() && connected){
139                            operationReadBuffer.flip();
140                            operation = operationReadBuffer.get();
141                            operationReadBuffer.clear();
142                    }
143    
144                    // Initialize a new reader for this operation
145                    binaryReader = new BinaryReader(TBTermFactory.getInstance());
146    
147                    return connected;
148            }
149    
150            /**
151             * Reads the data block length from the socket channel (2 bytes, little endian encoded).
152             * 
153             * @return False if we received a disconnect, true otherwise.
154             */
155            private boolean receiveBlockLength(){
156                    boolean connected = readFromChannel(lengthReadBuffer);
157    
158                    if(!lengthReadBuffer.hasRemaining() && connected){
159                            lengthReadBuffer.flip();
160                            byte[] blockLengthBytes = new byte[lengthReadBuffer.limit()];
161                            lengthReadBuffer.get(blockLengthBytes);
162    
163                            blockLength = NativeTypeConverter.makeUnsignedShort(blockLengthBytes);
164    
165                            lengthReadBuffer.clear();
166    
167                            readBuffer.clear();
168                            readBuffer.limit(blockLength);
169                    }
170    
171                    return connected;
172            }
173    
174            /**
175             * Reads the data from the current data block from the socket channel.
176             * 
177             * @return False if we received a disconnect, true otherwise.
178             */
179            private boolean receiveDataBlock(){
180                    boolean connected = readFromChannel(readBuffer);
181    
182                    if(!readBuffer.hasRemaining()){
183                            readBuffer.flip();
184    
185                            binaryReader.deserialize(readBuffer);
186    
187                            blockLength = -1;
188                    }
189    
190                    return connected;
191            }
192    
193            /**
194             * Reads bytes from the socket channel and inserts them into the given byte buffer.
195             * 
196             * @param buffer
197             *            The buffer to insert the received data into.
198             * @return False if we received a disconnect, true otherwise.
199             */
200            private boolean readFromChannel(ByteBuffer buffer){
201                    boolean connected = true;
202                    try{
203                            int bytesRead = socketChannel.read(buffer);
204                            if(bytesRead == -1) connected = false;
205                    }catch(IOException ioex){
206                            LoggerFactory.log("An IOException occured while reading from a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
207                            connectionHandler.closeDueToException(socketChannel, this);
208                            throw new RuntimeException(ioex);
209                    }
210    
211                    return connected;
212            }
213    
214            /**
215             * This method triggers the writing of data from the socket. Partial writes are possible, it
216             * will continue where it left off. IMPORTANT: Do not call this method concurrently; it will
217             * lead to undefined behaviour.
218             */
219            protected void write(){
220                    if(binaryWriter == null){
221                            OperationTermPair otp;
222                            synchronized(writeQueue){
223                                    if(writeQueue.isEmpty()) return; // Nothing to write.
224                                    otp = writeQueue.get();
225                            }
226                            sendOperation(otp.operation);
227    
228                            binaryWriter = new BinaryWriter(otp.term);
229                            doneWithBlock = true;
230                    }
231    
232                    if(doneWithBlock){
233                            try{
234                                    writeBuffer.clear();
235                                    binaryWriter.serialize(writeBuffer);
236                            }catch(VisitFailure vf){
237                                    // Bogus catch block, this never happens.
238                            }
239                            sendBlockLength(writeBuffer.limit());
240                    }
241    
242                    doneWithBlock = sendDataBlock();
243    
244                    if(doneWithBlock && binaryWriter.isFinished()) binaryWriter = null;
245            }
246    
247            /**
248             * Writes the operation byte to the stream.
249             * 
250             * @param op
251             *            The operation byte that indicates what the upcoming data package represents.
252             */
253            private void sendOperation(byte op){
254                    writeBuffer.clear();
255                    writeBuffer.put(op);
256                    writeBuffer.flip();
257    
258                    forcedWrite(writeBuffer);
259            }
260    
261            /**
262             * Writes the block length to the socket channel.
263             * 
264             * @param length
265             *            The length of the block of data that will be written.
266             */
267            private void sendBlockLength(int length){
268                    lengthWriteBuffer.clear();
269                    lengthWriteBuffer.put(NativeTypeConverter.makeBytesFromUnsignedShort(length));
270                    lengthWriteBuffer.flip();
271    
272                    forcedWrite(lengthWriteBuffer);
273            }
274    
275            /**
276             * Writes a block of data to the socket channel.
277             * 
278             * @return True if the entire block of data has been written to the socket channel, false
279             *         otherwise.
280             */
281            private boolean sendDataBlock(){
282                    try{
283                            socketChannel.write(writeBuffer);
284                    }catch(IOException ioex){
285                            LoggerFactory.log("An error occured while writing the end of stream byte to a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
286                            connectionHandler.closeDueToException(socketChannel, this);
287                            throw new RuntimeException(ioex);
288                    }
289    
290                    return !writeBuffer.hasRemaining();
291            }
292    
293            /**
294             * Forces the writing of the content of the given buffer to the socket channel. Use this method
295             * for writing small amounts of data (no more then a few bytes) that need to be send in one
296             * piece.
297             * 
298             * @param buffer
299             *            The buffer to write to.
300             */
301            private void forcedWrite(ByteBuffer buffer){
302                    while(buffer.hasRemaining()){
303                            try{
304                                    socketChannel.write(buffer);
305                            }catch(IOException ioex){
306                                    LoggerFactory.log("An error occured while writing to a socket channel.", ioex, ILogger.ERROR, IToolBusLoggerConstants.COMMUNICATION);
307                                    connectionHandler.closeDueToException(socketChannel, this);
308                                    throw new RuntimeException(ioex);
309                            }
310    
311                            // Yield when needed.
312                            if(buffer.hasRemaining()) Thread.yield();
313                    }
314            }
315    
316            /**
317             * Checks if there is data left that needs to be written.
318             * 
319             * @return True if there is data left that needs to be written; false otherwise.
320             */
321            public boolean hasMoreToWrite(){
322                    boolean hasMoreToWrite = (binaryWriter != null);
323                    if(!hasMoreToWrite){
324                            synchronized(writeQueue){
325                                    hasMoreToWrite = !writeQueue.isEmpty();
326                            }
327                    }
328    
329                    return hasMoreToWrite;
330            }
331    
332            /**
333             * Terminates the connection with the in this socket I/O handler contained socket channel in an
334             * orderly manner. All data that was queued for writing will be written to the socket before the
335             * end message will be send.
336             * 
337             * @see IIOHandler#terminate()
338             */
339            public void terminate(){
340                    OperationTermPair otp = new OperationTermPair();
341                    otp.operation = END_OPC;
342                    otp.term = TBTermFactory.getInstance().makeList();
343    
344                    synchronized(writeQueue){
345                            writeQueue.put(otp);
346                    }
347    
348                    IWriteMultiplexer writeMultiplexer = connectionHandler.getWriteMultiplexer();
349                    writeMultiplexer.registerForWrite(socketChannel, this);
350    
351                    expectingDisconnect = true;
352            }
353    
354            /**
355             * @see IIOHandler#shutDown()
356             */
357            public void shutDown(){
358                    dataHandler.shutDown();
359            }
360            
361            /**
362             * @see IIOHandler#exceptionOccured()
363             */
364            public void exceptionOccured(){
365                    dataHandler.exceptionOccured();
366            }
367    
368            /**
369             * Handles the received disconnect. The proper close method on the connection handler will be
370             * called depending on whether or not we anticipated the disconnect.
371             */
372            private void handleDisconnect(){
373                    if(expectingDisconnect) connectionHandler.closeConnection(socketChannel);
374                    else connectionHandler.closeDueToDisconnect(socketChannel, this);
375            }
376    
377            /**
378             * A structure that links a term with an operation.
379             * 
380             * @author Arnold Lankamp
381             */
382            protected static class OperationTermPair{
383                    public byte operation = -1;
384                    public ATerm term = null;
385            }
386    }