001    package toolbus;
002    
003    import java.io.IOException;
004    import java.lang.management.ManagementFactory;
005    import java.lang.management.MemoryMXBean;
006    import java.lang.management.ThreadInfo;
007    import java.lang.management.ThreadMXBean;
008    import java.net.ConnectException;
009    import java.net.InetAddress;
010    import java.net.InetSocketAddress;
011    import java.net.Socket;
012    import java.net.UnknownHostException;
013    import java.nio.ByteBuffer;
014    import java.nio.channels.SocketChannel;
015    import java.util.HashMap;
016    import java.util.HashSet;
017    import java.util.LinkedList;
018    import java.util.List;
019    import java.util.Map;
020    import java.util.Queue;
021    import java.util.Set;
022    
023    import jjtraveler.VisitFailure;
024    import aterm.ATerm;
025    import aterm.ATermAppl;
026    import aterm.ATermFactory;
027    import aterm.ATermInt;
028    import aterm.ATermList;
029    import aterm.pure.PureFactory;
030    import aterm.pure.binary.BinaryReader;
031    import aterm.pure.binary.BinaryWriter;
032    
033    abstract public class AbstractTool implements Tool, Runnable, IOperations{
034            private final static int HANDSHAKEBUFFERSIZE = 4096;
035            
036            private final ByteBuffer writeBuffer;
037            private final ByteBuffer readBuffer;
038            private SocketChannel socketChannel = null;
039            
040            protected final ATermFactory factory;
041            
042            private String toolname;
043            private int toolid = -1;
044            
045            private String host;
046            private int port = -1;
047            
048            private final Map<Long, ThreadLocalJobQueue> threadLocalQueues;
049            private final Map<String, JobQueue> queues;
050            
051            private final WorkerQueue workerQueue;
052            
053            private ATerm empty;
054            
055            private Object lockObject;
056            
057            private volatile boolean running;
058            
059            private boolean connected;
060            
061            private boolean expectingDisconnect = false;
062            
063            public AbstractTool(ATermFactory factory){
064                    this.factory = factory;
065                    
066                    empty = factory.makeList();
067                    threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>();
068                    queues = new HashMap<String, JobQueue>();
069                    
070                    workerQueue = new WorkerQueue();
071                    
072                    lockObject = this;
073                    
074                    writeBuffer = ByteBuffer.allocateDirect(32768);
075                    readBuffer = ByteBuffer.allocateDirect(32768);
076            }
077            
078            public void init(String[] args) throws UnknownHostException{
079                    for(int i = 0; i < args.length; i++){
080                            if(args[i].equals("-TB_PORT")){
081                                    port = Integer.parseInt(args[++i]);
082                            }
083                            if(args[i].equals("-TB_HOST")){
084                                    host = args[++i];
085                            }
086                            if(args[i].equals("-TB_TOOL_NAME")){
087                                    toolname = args[++i];
088                            }
089                            if(args[i].equals("-TB_TOOL_ID")){
090                                    toolid = Integer.parseInt(args[++i]);
091                            }
092                    }
093                    
094                    if(host == null || port == -1){
095                            throw new RuntimeException("Dunno where the ToolBus is running, so can't connect.");
096                    }
097            }
098            
099            public void setLockObject(Object lockObject){
100                    this.lockObject = lockObject;
101            }
102            
103            public Object getLockObject(){
104                    return lockObject;
105            }
106            
107            public void connect() throws IOException{
108                    if(connected) throw new IOException("already connected to ToolBus");
109                    
110                    try{
111                            socketChannel = SocketChannel.open();
112                            Socket socket = socketChannel.socket();
113                            // Disable Nagle's algorithm, we don't want the random 500ms delays.
114                            socket.setTcpNoDelay(true);
115                            // Set the traffic class to high throughput and low delay.
116                            socket.setTrafficClass(0x18);
117    
118                            socketChannel.connect(new InetSocketAddress(host, port));
119                            socketChannel.configureBlocking(true);
120    
121                            shakeHands();
122                    }catch(ConnectException cex){
123                            throw new RuntimeException(cex);
124                    }catch(IOException ioex){
125                            closeConnection();
126                            throw new RuntimeException(ioex);
127                    }catch(RuntimeException rex){
128                            closeConnection();
129                            throw new RuntimeException(rex);
130                    }
131                    
132                    connected = true;
133            }
134            
135            public void connect(String tool_name, InetAddress address, int p) throws IOException{
136                    if(connected) throw new IOException("already connected to ToolBus");
137                    
138                    this.toolname = tool_name;
139                    this.host = address.getHostAddress();
140                    this.port = p;
141                    
142                    try{
143                            socketChannel = SocketChannel.open();
144                            Socket socket = socketChannel.socket();
145                            // Disable Nagle's algorithm, we don't want the random 500ms delays.
146                            socket.setTcpNoDelay(true);
147                            // Set the traffic class to high throughput and low delay.
148                            socket.setTrafficClass(0x18);
149    
150                            socketChannel.connect(new InetSocketAddress(host, port));
151                            socketChannel.configureBlocking(true);
152    
153                            shakeHands();
154                    }catch(ConnectException cex){
155                            throw new RuntimeException(cex);
156                    }catch(IOException ioex){
157                            closeConnection();
158                            throw new RuntimeException(ioex);
159                    }catch(RuntimeException rex){
160                            closeConnection();
161                            throw new RuntimeException(rex);
162                    }
163                    
164                    connected = true;
165            }
166            
167            public void disconnect(){
168                    connected = false;
169                    sendTerm(DISCONNECT, empty);
170                    
171                    expectingDisconnect = true;
172            }
173            
174            public boolean isConnected(){
175                    return connected;
176            }
177            
178            public int getPort(){
179                    return port;
180            }
181            
182            public InetAddress getAddress(){
183                    InetAddress adress;
184                    try{
185                            adress = InetAddress.getByName(host);
186                    }catch(UnknownHostException uhex){
187                            throw new RuntimeException("Unable to resolve ToolBus host adress.");
188                    }
189                    return adress;
190            }
191            
192            private void writeTermToChannel(ATerm aTerm, ByteBuffer byteBuffer) throws IOException{
193                    BinaryWriter binaryWriter = new BinaryWriter(aTerm);
194                    while(!binaryWriter.isFinished()){
195                            byteBuffer.clear();
196                            byteBuffer.position(2);
197                            try{
198                                    binaryWriter.serialize(byteBuffer);
199                            }catch(VisitFailure vf){
200                                    // Bogus catch block, this can't happen.
201                            }
202                            // Insert chunk size data.
203                            int chunkSize = byteBuffer.limit() - 2;
204                            byteBuffer.put(0, (byte) (chunkSize & 0x000000FF));
205                            byteBuffer.put(1, (byte) ((chunkSize >>> 8) & 0x000000FF));
206                            
207                            // Write chunk
208                            int byteWritten = socketChannel.write(byteBuffer);
209                            if(byteWritten == -1) throw new IOException("Tool disconnected");
210                    }
211            }
212            
213            private ATerm readTermFromChannel(ByteBuffer byteBuffer) throws IOException{
214                    BinaryReader binaryReader = new BinaryReader((PureFactory) factory);
215                    while(!binaryReader.isDone()){
216                            byteBuffer.clear();
217                            byteBuffer.limit(2);
218                            do{
219                                    int bytesRead = socketChannel.read(byteBuffer);
220                                    if(bytesRead == -1) throw new IOException("Tool disconnected");
221                            }while(byteBuffer.hasRemaining()); // <-- Workaround for a NIO bug. Blocking mode doesn't work properly.
222                            byteBuffer.flip();
223                            
224                            int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8);
225                            
226                            byteBuffer.clear();
227                            byteBuffer.limit(chunkSize);
228                            do{
229                                    int bytesRead = socketChannel.read(byteBuffer);
230                                    if(bytesRead == -1) throw new IOException("Tool disconnected");
231                            }while(byteBuffer.hasRemaining()); // <-- Workaround for a NIO bug. Blocking mode doesn't work properly.
232                            byteBuffer.flip();
233                            
234                            binaryReader.deserialize(byteBuffer);
235                    }
236                    
237                    return binaryReader.getRoot();
238            }
239            
240            private void shakeHands() throws IOException{
241                    ByteBuffer handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE);
242                    
243                    ATerm toolKey = factory.makeAppl(factory.makeAFun(toolname, 1, false), factory.makeInt(toolid));
244    
245                    // Send the tool id.
246                    writeTermToChannel(toolKey, handShakeBuffer);
247    
248                    // Receive signature.
249                    handShakeBuffer.clear();
250                    readTermFromChannel(handShakeBuffer);
251    
252                    // TODO Check the signature.
253                    byte sigOKByte = 1;
254    
255                    // Send signature confirmation.
256                    handShakeBuffer.clear();
257                    handShakeBuffer.put(sigOKByte);
258                    handShakeBuffer.flip();
259                    socketChannel.write(handShakeBuffer);
260    
261                    // Receive the (permanent) tool key.
262                    ATerm newToolKey = readTermFromChannel(handShakeBuffer);
263    
264                    int toolID = ((ATermInt) newToolKey.getChildAt(0)).getInt();
265                    toolid = toolID;
266                    
267                    sendTerm(CONNECT, newToolKey);
268            }
269            
270            private void sendTerm(byte operation, ATerm term){
271                    synchronized(writeBuffer){
272                            writeBuffer.clear();
273                            writeBuffer.put(operation);
274                            writeBuffer.flip();
275                            
276                            try{
277                                    int bytesWritten = socketChannel.write(writeBuffer);
278                                    if(bytesWritten == -1) throw new IOException("Tool disconnected");
279                                    
280                                    writeTermToChannel(term, writeBuffer);
281                            }catch(IOException ioex){
282                                    throw new RuntimeException(ioex);
283                            }
284                    }
285            }
286            
287            private class OperationTermPair{
288                    public byte operation;
289                    public ATerm aTerm;
290            }
291            
292            public OperationTermPair readTerm() throws IOException{
293                    OperationTermPair otp = new OperationTermPair();
294                    
295                    synchronized(readBuffer){
296                            readBuffer.clear();
297                            readBuffer.limit(1);
298                            int byteRead = socketChannel.read(readBuffer);
299                            if(byteRead == -1) return null;
300                            readBuffer.flip();
301                            
302                            otp.operation = readBuffer.get(0);
303                            
304                            otp.aTerm = readTermFromChannel(readBuffer);
305                    }
306                    
307                    return otp;
308            }
309            
310            private void setRunning(boolean state){
311                    running = state;
312            }
313            
314            public void run(){
315                    workerQueue.start();
316                    
317                    setRunning(true);
318                    try{
319                            while(running){
320                                    handleIncomingTerm();
321                            }
322                    }catch(IOException ioex){
323                            ioex.printStackTrace();
324                            System.exit(0);
325                    }
326            }
327            
328            public void stopRunning(){
329                    setRunning(false);
330                    workerQueue.terminate();
331            }
332            
333            public void handleIncomingTerm() throws IOException{
334                    OperationTermPair otp = readTerm();
335                    if(otp == null){
336                            if(expectingDisconnect){
337                                    setRunning(false);
338                                    return;
339                            }
340                            throw new IOException("Tool disconnected.");
341                    }
342                    
343                    handleIncomingTerm(otp.operation, otp.aTerm);
344            }
345            
346            public void handleIncomingTerm(byte operation, ATerm t){
347                    handleTerm(operation, t);
348            }
349            
350            protected void handleTerm(byte operation, final ATerm t){
351                    switch(operation){
352                            case DO:
353                                    workerQueue.execute(new Runnable(){
354                                            public void run(){
355                                                    handler(factory.make("rec-do(<term>)", t));
356                                                    sendTerm(ACKDO, empty);
357                                            }
358                                    });
359                                    break;
360                            case EVAL:
361                                    workerQueue.execute(new Runnable(){
362                                            public void run(){
363                                                    ATermAppl result = (ATermAppl) handler(factory.make("rec-eval(<term>)", t));
364                                                    sendTerm(VALUE, result.getArgument(0));
365                                            }
366                                    });
367                                    break;
368                            case ACKEVENT:
369                                    ATerm ackEvent = ((ATermList) t).getLast();
370                                    handler(factory.make("rec-ack-event(<term>)", ackEvent));
371                                    ackEvent((ATermList) t);
372                                    break;
373                            case RESPONSE:
374                                    ATermAppl response = (ATermAppl) t;
375                                    String responseSourceName = response.getAFun().toString();
376                                    
377                                    JobQueue requestQueue;
378                                    synchronized(queues){
379                                            requestQueue = queues.get(responseSourceName);
380                                    }
381                                    if(requestQueue == null){
382                                            System.err.println("Received response on a non-existent request: " + responseSourceName);
383                                            return;
384                                    }
385                                    requestQueue.recResponse(response);
386                                    break;
387                            case TERMINATE:
388                                    workerQueue.execute(new Runnable(){
389                                            public void run(){
390                                                    handler(factory.make("rec-terminate(<term>)", t));
391                                                    expectingDisconnect = true;
392                                                    sendTerm(END, empty);
393                                            }
394                                    });
395                                    break;
396                            case PERFORMANCESTATS:
397                                    ATerm performaceStats = getPerformanceStats();
398                                    sendTerm(PERFORMANCESTATS, performaceStats);
399                                    break;
400                            case DEBUGPERFORMANCESTATS:
401                                    ATerm debugPerformaceStats = getPerformanceStats();
402                                    sendTerm(DEBUGPERFORMANCESTATS, debugPerformaceStats);
403                                    break;
404                            case END:
405                                    connected = false;
406                                    setRunning(false);
407                                    closeConnection();
408                                    break;
409                            default:
410                                    throw new RuntimeException("Unknown tool operation with ID: " + operation);
411                    }
412            }
413            
414            public void sendTerm(ATerm term) throws IOException{
415                    throw new IOException("Unsupported operation. Unable to send term: "+term);
416            }
417            
418            public void sendEvent(ATerm term){
419                    postEvent(term);
420            }
421            
422            public void postEvent(ATerm term){
423                    long threadId = Thread.currentThread().getId();
424                    ThreadLocalJobQueue threadLocalQueue = null;
425                    synchronized(threadLocalQueues){
426                            threadLocalQueue = threadLocalQueues.get(new Long(threadId));
427                            if(threadLocalQueue == null){
428                                    threadLocalQueue = new ThreadLocalJobQueue();
429                                    threadLocalQueues.put(new Long(threadId), threadLocalQueue);
430                            }
431                    }
432                    threadLocalQueue.postEvent(term, threadId);
433            }
434            
435            public ATerm postRequest(ATerm term){
436                    long threadId = Thread.currentThread().getId();
437                    ThreadLocalJobQueue threadLocalQueue;
438                    synchronized(threadLocalQueues){
439                            threadLocalQueue = threadLocalQueues.get(new Long(threadId));
440                            if(threadLocalQueue == null){
441                                    threadLocalQueue = new ThreadLocalJobQueue();
442                                    threadLocalQueues.put(new Long(threadId), threadLocalQueue);
443                            }
444                    }
445                    Job job = threadLocalQueue.postRequest(term, threadId);
446                    return threadLocalQueue.waitForResponse(job);
447            }
448            
449            private void ackEvent(ATermList ackEvent){
450                    ATerm event = ackEvent.getFirst();
451                    
452                    String sourceName = ((ATermAppl) event).getAFun().toString();
453                    
454                    JobQueue eventQueue;
455                    synchronized(queues){
456                            eventQueue = queues.get(sourceName);
457                    }
458                    eventQueue.ackEvent();
459            }
460            
461            private void closeConnection(){
462                    Socket socket = socketChannel.socket();
463                    
464                    // Close the in- output stream of the socket to ensure that the file descriptors are closed
465                    // immidiately and NOT whenever the JVM feels like it.
466                    try{
467                            if(!socket.isInputShutdown()) socket.shutdownInput();
468                    }catch(IOException ioex){
469                            // Ignore
470                    }
471                    try{
472                            if(!socket.isOutputShutdown()) socket.shutdownOutput();
473                    }catch(IOException ioex){
474                            // Ignore
475                    }
476    
477                    try{
478                            if(!socket.isClosed()) socket.close();
479                    }catch(IOException ioex){
480                            ioex.printStackTrace();
481                    }
482            }
483    
484            /**
485             * A job.
486             * 
487             * @author Arnold Lankamp
488             */
489            private static class Job{
490                    public final byte operation;
491                    public final ATerm term;
492                    public final long threadId;
493                    public ATerm response; // Optional field
494                    
495                    /**
496                     * Constructor.
497                     * 
498                     * @param operation
499                     *            The op-code.
500                     * @param term
501                     *            The message associated with this event.
502                     * @param threadId
503                     *            The id of the thread associated with this event.
504                     */
505                    public Job(byte operation, ATerm term, long threadId){
506                            super();
507                            
508                            this.operation = operation;
509                            this.term = term;
510                            this.threadId = threadId;
511                    }
512            }
513    
514            /**
515             * This job queue holds all the jobs that are send from a single source.
516             * 
517             * @author Arnold Lankamp
518             */
519            private class JobQueue{
520                    private final List<Job> jobs;
521                    private Job current;
522    
523                    /**
524                     * Default constructor.
525                     */
526                    public JobQueue(){
527                            super();
528                            
529                            jobs = new LinkedList<Job>();
530                            
531                            current = null;
532                    }
533                    
534                    /**
535                     * Schedules the given job for transmission to the ToolBus. If there are currently no
536                     * jobs in the queue, the event will be send immediately; otherwise we'll need to wait till
537                     * all the requests (of the same source) that where previously scheduled have been send and
538                     * acknowledged.
539                     * 
540                     * @param job
541                     *            A container that hold the details about the request.
542                     */
543                    public synchronized void post(Job job){
544                            if(current == null){
545                                    sendTerm(job.operation, job.term);
546                                    current = job;
547                            }else{
548                                    jobs.add(job);
549                            }
550                    }
551    
552                    /**
553                     * Returns the next job in the queue.
554                     * 
555                     * @return The next job in the queue; null if the queue is empty.
556                     */
557                    public synchronized Job getNext(){
558                            if(!jobs.isEmpty()) return jobs.remove(0);
559                            
560                            return null;
561                    }
562                    
563                    /**
564                     * Notifies the thread local queue of the acknowledgement and executes the next queued job
565                     * (if present).
566                     */
567                    private void acknowledge(){
568                            long threadId = current.threadId;
569                            ThreadLocalJobQueue threadLocalQueue;
570                            synchronized(threadLocalQueues){
571                                    threadLocalQueue = threadLocalQueues.get(new Long(threadId));
572                            }
573                            threadLocalQueue.acknowledge();
574                            
575                            Job next = getNext();
576                            current = next;
577                            if(next != null){
578                                    sendTerm(next.operation, next.term);
579                            }
580                    }
581    
582                    /**
583                     * Acknowledges the last event that was send from the source this queue is associated with.
584                     * It will execute the next job in the queue if there are any.
585                     */
586                    public synchronized void ackEvent(){
587                            acknowledge();
588                    }
589                    
590                    /**
591                     * Acknowledges the last request that was send from the source this queue is associated
592                     * with. It will execute the next job in the queue if there are any.
593                     * 
594                     * @param response
595                     *            The response.
596                     */
597                    public synchronized void recResponse(ATermAppl response){
598                            synchronized(current){
599                                    current.response = response;
600                                    current.notify();
601                            }
602                            
603                            acknowledge();
604                    }
605            }
606            
607            /**
608             * This job queue holds all the jobs that are posted by a certain thread.
609             * 
610             * @author Arnold Lankamp
611             */
612            private class ThreadLocalJobQueue{
613                    private final List<Job> requests;
614                    
615                    private boolean awaitingAck;
616    
617                    /**
618                     * Default constructor.
619                     */
620                    public ThreadLocalJobQueue(){
621                            super();
622                            
623                            requests = new LinkedList<Job>();
624                    }
625                    
626                    /**
627                     * Schedules the given event for transmission to the ToolBus. If there are currently no
628                     * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
629                     * to wait till all the jobs (associated with the current thread) that were previously
630                     * scheduled have been submitted to the request queue.
631                     * 
632                     * @param aTerm
633                     *            The term that hold the details about the event.
634                     * @param threadId
635                     *            The id of the thread associated with the event.
636                     */
637                    public synchronized void postEvent(ATerm aTerm, long threadId){
638                            Job request = new Job(EVENT, aTerm, threadId);
639                            
640                            if(!awaitingAck){
641                                    String sourceName = ((ATermAppl) aTerm).getAFun().getName();
642                                    
643                                    JobQueue requestQueue;
644                                    synchronized(queues){
645                                            requestQueue = queues.get(sourceName);
646                                            if(requestQueue == null){
647                                                    requestQueue = new JobQueue();
648                                                    queues.put(sourceName, requestQueue);
649                                            }
650                                    }
651                                    requestQueue.post(request);
652                                    awaitingAck = true;
653                            }else{
654                                    requests.add(request);
655                            }
656                    }
657                    
658                    /**
659                     * Schedules the given request for transmission to the ToolBus. If there are currently no
660                     * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
661                     * to wait till all the jobs (associated with the current thread) that were previously
662                     * scheduled have been submitted to the request queue.
663                     * 
664                     * @param aTerm
665                     *            The term that hold the details about the request.
666                     * @param threadId
667                     *            The id of the thread associated with the request.
668                     * @return The received response on the issued request.
669                     */
670                    public synchronized Job postRequest(ATerm aTerm, long threadId){
671                            Job job = new Job(REQUEST, aTerm, threadId);
672                            if(!awaitingAck){
673                                    String sourceName = ((ATermAppl) aTerm).getAFun().getName();
674                                    
675                                    JobQueue requestQueue;
676                                    synchronized(queues){
677                                            requestQueue = queues.get(sourceName);
678                                            if(requestQueue == null){
679                                                    requestQueue = new JobQueue();
680                                                    queues.put(sourceName, requestQueue);
681                                            }
682                                    }
683                                    requestQueue.post(job);
684                                    awaitingAck = true;
685                            }else{
686                                    requests.add(job);
687                                    }
688                            return job;
689                    }
690                    
691                    public ATerm waitForResponse(Job job){
692                            synchronized(job){
693                                    while(job.response == null){
694                                            try{
695                                                    job.wait();
696                                            }catch(InterruptedException irex){
697                                                    // Ignore this, since I don't want to know about it.
698                                            }
699                                    }
700                            }
701                            return job.response;
702                    }
703                    
704                    /**
705                     * Returns the next job in the queue.
706                     * 
707                     * @return The next job in the queue; null if the queue is empty.
708                     */
709                    public synchronized Job getNext(){
710                            if(!requests.isEmpty()) return requests.remove(0);
711                            
712                            return null;
713                    }
714                    
715                    /**
716                     * Acknowledges the last job that was send from the source the current thread is associated
717                     * with. It will submit the next job in the queue if there are any.
718                     */
719                    public synchronized void acknowledge(){
720                            Job next = getNext();
721                            if(next == null){
722                                    awaitingAck = false;
723                            }else{
724                                    ATerm term = next.term;
725                                    String sourceName = ((ATermAppl) term).getAFun().getName();
726                                    
727                                    JobQueue requestQueue;
728                                    synchronized(queues){
729                                            requestQueue = queues.get(sourceName);
730                                            if(requestQueue == null){
731                                                    requestQueue = new JobQueue();
732                                                    queues.put(sourceName, requestQueue);
733                                            }
734                                    }
735                                    
736                                    requestQueue.post(next);
737                            }
738                    }
739            }
740            
741            /**
742             * The queue that is meant to take care of the asynchroneous execution and queueing of anything
743             * that invokes stuff on a tool.
744             * 
745             * @author Arnold Lankamp
746             */
747            private static class WorkerQueue{
748                    private final Object lock = new Object();
749                    
750                    private final Queue<Runnable> queue;
751                    private final Worker worker;
752                    
753                    /**
754                     * Default constructor.
755                     */
756                    public WorkerQueue(){
757                            super();
758                            
759                            this.queue = new LinkedList<Runnable>();
760                            this.worker = new Worker();
761                            worker.setDaemon(true);
762                    }
763                    
764                    /**
765                     * Starts the worker thread.
766                     */
767                    public void start(){
768                            worker.start();
769                    }
770                    
771                    /**
772                     * Executes or queues the given runnable for execution.
773                     * 
774                     * @param r
775                     *            The runnable containing the stuff that needs to be executed.
776                     */
777                    public void execute(Runnable r){
778                            synchronized(lock){
779                                    boolean notBusy = queue.isEmpty();
780                                    queue.offer(r);
781                                    if(notBusy) lock.notify();
782                            }
783                    }
784                    
785                    /**
786                     * Terminates the worker thread as soon as is gracefully possible.
787                     */
788                    public void terminate(){
789                            worker.terminate();
790                    }
791                    
792                    /**
793                     * The worker thread of this queue.
794                     * 
795                     * @author Arnold Lankamp
796                     */
797                    private class Worker extends Thread{
798                            private volatile boolean running;
799                            
800                            /**
801                             * Default constructor.
802                             */
803                            public Worker(){
804                                    super();
805                                    
806                                    running = true;
807                            }
808                            
809                            /**
810                             * The main execution loop.
811                             */
812                            public void run(){
813                                    do{
814                                            Runnable r;
815                                            synchronized(lock){
816                                                    r = queue.poll();
817                                                    while(r == null){
818                                                            try{
819                                                                    lock.wait();
820                                                            }catch(InterruptedException irex){
821                                                                    // Ignore this.
822                                                            }
823                                                            if(!running) return;
824                                                            
825                                                            r = queue.poll();
826                                                    }
827                                            }
828                                            
829                                            r.run();
830                                    }while(running);
831                            }
832                            
833                            /**
834                             * Terminates this worker thread after the current iteration.
835                             */
836                            public void terminate(){
837                                    running = false;
838                                    synchronized(lock){
839                                            lock.notify();
840                                    }
841                            }
842                    }
843            }
844            
845            /**
846             * Gathers performance statistics about this tool, like memory usage and the user-/system-time
847             * spend per thread.
848             * 
849             * @return performance statictics.
850             */
851            private ATerm getPerformanceStats(){
852                    // Type stuff
853                    ATerm remote = factory.makeAppl(factory.makeAFun("legacy", 0, true));
854                    ATerm toolType = factory.makeAppl(factory.makeAFun("type", 1, false), remote);
855                    
856                    ATerm java = factory.makeAppl(factory.makeAFun("Java", 0, true));
857                    ATerm toolLanguage = factory.makeAppl(factory.makeAFun("language", 1, false), java);
858                    
859                    ATerm tool = factory.makeAppl(factory.makeAFun("tool", 2, false), toolType, toolLanguage);
860                    
861                    // Memory stuff
862                    MemoryMXBean mmxb = ManagementFactory.getMemoryMXBean();
863                    long heapMemoryUsage = mmxb.getHeapMemoryUsage().getUsed();
864                    long nonHeapMemoryUsage = mmxb.getNonHeapMemoryUsage().getUsed();
865                    
866                    ATerm heapUsage = factory.makeAppl(factory.makeAFun("heap-usage", 1, false), factory.makeInt(((int) (heapMemoryUsage / 1024))));
867                    ATerm nonHeapUsage = factory.makeAppl(factory.makeAFun("non-heap-usage", 1, false), factory.makeInt(((int) (nonHeapMemoryUsage / 1024))));
868                    
869                    ATerm memory = factory.makeAppl(factory.makeAFun("memory-usage", 2, false), heapUsage, nonHeapUsage);
870                    
871                    // Thread stuff
872                    ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup();
873                    Thread[] relevantThreads = new Thread[currentThreadGroup.activeCount() * 2 + 10]; // Create an array that's more then big enough.
874                    currentThreadGroup.enumerate(relevantThreads);
875                    
876                    Set<Long> relevantThreadIds = new HashSet<Long>();
877                    for(int i = 0; i < relevantThreads.length; i++){
878                            Thread thread = relevantThreads[i];
879                            if(thread != null){
880                                    relevantThreadIds.add(new Long(thread.getId()));
881                            }
882                    }
883                    
884                    ThreadMXBean tmxb = ManagementFactory.getThreadMXBean();
885                    
886                    ATerm threads;
887                    
888                    long[] threadIds = tmxb.getAllThreadIds();
889                    int nrOfThreads = threadIds.length;
890                    try{
891                            ATermList threadsList = factory.makeList();
892                            for(int i = 0; i < nrOfThreads; i++){
893                                    long threadId = threadIds[i];
894                                    if(relevantThreadIds.contains(new Long(threadId))){ // Only list the info if we're interested in it.
895                                            ThreadInfo ti = tmxb.getThreadInfo(threadId);
896                                            if(ti != null){
897                                                    String threadName = ti.getThreadName();
898                                                    long userTime = tmxb.getThreadUserTime(threadIds[i]);
899                                                    long systemTime = tmxb.getThreadCpuTime(threadIds[i]) - userTime;
900                                                    
901                                                    if((userTime + systemTime) <= 0) continue;
902                                                    
903                                                    ATerm userTimeTerm = factory.makeAppl(factory.makeAFun("user-time", 1, false), factory.makeInt(((int) (userTime / 1000000))));
904                                                    ATerm systemTimeTerm = factory.makeAppl(factory.makeAFun("system-time", 1, false), factory.makeInt(((int) (systemTime / 1000000))));
905                                                    ATerm thread = factory.makeAppl(factory.makeAFun(threadName, 2, false), userTimeTerm, systemTimeTerm);
906                                                    
907                                                    threadsList = factory.makeList(thread, threadsList);
908                                            }
909                                    }
910                            }
911                            
912                            threads = factory.makeAppl(factory.makeAFun("threads", 1, false), threadsList);
913                    }catch(UnsupportedOperationException uoex){
914                            threads = factory.make("threads(unsupported-operation)");
915                            System.out.println("Thread time profiling is not supported by this JVM.");
916                    }
917                    
918                    return factory.makeAppl(factory.makeAFun("performance-stats", 3, false), tool, memory, threads);
919            }
920    }