001    package toolbus.adapter;
002    
003    import java.net.InetAddress;
004    import java.util.HashMap;
005    import java.util.LinkedList;
006    import java.util.List;
007    import java.util.Map;
008    
009    import toolbus.DirectConnectionHandler;
010    import toolbus.IOperations;
011    import toolbus.ToolBus;
012    import toolbus.communication.IDataHandler;
013    import toolbus.communication.IIOHandler;
014    import toolbus.exceptions.ToolBusError;
015    import toolbus.logging.ILogger;
016    import toolbus.logging.IToolBusLoggerConstants;
017    import toolbus.logging.LoggerFactory;
018    import toolbus.util.collections.RotatingQueue;
019    import aterm.ATerm;
020    import aterm.ATermAppl;
021    import aterm.ATermList;
022    import aterm.pure.PureFactory;
023    
024    /**
025     * This class supplies an interface for the tool towards the ToolBus and handles the invokation of
026     * methods on the tool.
027     * 
028     * @author Arnold Lankamp
029     */
030    public abstract class ToolBridge implements IDataHandler, Runnable, IOperations{
031            private final PureFactory termFactory;
032    
033            private final Map<Long, ThreadLocalJobQueue> threadLocalQueues;
034            private final Map<String, JobQueue> queues;
035            
036            private final WorkerQueue workerQueue;
037    
038            private volatile IIOHandler ioHandler;
039    
040            private final String type;
041    
042            private final String toolName;
043            private int toolID = -1;
044    
045            private final InetAddress host;
046            private final int port;
047            
048            private final ToolBus toolbus;
049    
050            /**
051             * Constructor.
052             * 
053             * @param termFactory
054             *            The term factory to use.
055             * @param toolName
056             *            The name of the with this bridge associated tool.
057             * @param toolID
058             *            The id of the with this bridge associated tool.
059             * @param host
060             *            The host on which the ToolBus is running.
061             * @param port
062             *            The port on which the ToolBus is running.
063             */
064            public ToolBridge(PureFactory termFactory, String toolName, int toolID, InetAddress host, int port){
065                    super();
066                    
067                    this.termFactory = termFactory;
068                    this.type = AbstractTool.REMOTETOOL;
069                    this.toolName = toolName;
070                    this.toolID = toolID;
071                    this.host = host;
072                    this.port = port;
073                    
074                    this.toolbus = null;
075    
076                    threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>();
077                    queues = new HashMap<String, JobQueue>();
078                    
079                    workerQueue = new WorkerQueue();
080            }
081            
082            /**
083             * Constructor.
084             * 
085             * @param termFactory
086             *            The term factory to use.
087             * @param toolName
088             *            The name of the with this bridge associated tool.
089             * @param toolID
090             *            The id of the with this bridge associated tool.
091             * @param toolbus
092             *            The toolbus to connect to.
093             */
094            public ToolBridge(PureFactory termFactory, String toolName, int toolID, ToolBus toolbus){
095                    super();
096                    
097                    this.termFactory = termFactory;
098                    this.type = AbstractTool.DIRECTTOOL;
099                    this.toolName = toolName;
100                    this.toolID = toolID;
101                    this.toolbus = toolbus;
102                    
103                    this.host = null;
104                    this.port = -1;
105    
106                    threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>();
107                    queues = new HashMap<String, JobQueue>();
108                    
109                    workerQueue = new WorkerQueue();
110            }
111            
112            /**
113             * Returns what type of connection we have with the ToolBus.
114             * 
115             * @return What type of connection we have with the ToolBus.
116             */
117            public String getType(){
118                    return type;
119            }
120    
121            /**
122             * Returns the tool name of the tool that is associated with this tool bridge.
123             * 
124             * @return The tool name of the tool that is associated with this tool bridge.
125             */
126            public String getToolName(){
127                    return toolName;
128            }
129    
130            /**
131             * Sets the tool id to the given values. This method will only be called once, after
132             * successfully connecting to the ToolBus.
133             * 
134             * @param toolID
135             *            The value the toolID has to be set too.
136             */
137            protected void setToolID(int toolID){
138                    this.toolID = toolID;
139            }
140    
141            /**
142             * Returns the tool id of the tool that is associated with this tool bridge.
143             * 
144             * @return The tool id of the tool that is associated with this tool bridge.
145             */
146            public int getToolID(){
147                    return toolID;
148            }
149    
150            /**
151             * Returns a reference to the term factory.
152             * 
153             * @return A reference to the term factory.
154             */
155            public PureFactory getFactory(){
156                    return termFactory;
157            }
158    
159            /**
160             * Returns the adress of the host the ToolBus is running on.
161             * 
162             * @return The adress of the host the ToolBus is running on. Is null when uninitialized.
163             */
164            protected InetAddress getHost(){
165                    return host;
166            }
167    
168            /**
169             * Returns the port number the ToolBus is running on.
170             * 
171             * @return The port number the ToolBus is running on. Is -1 when uninitialized.
172             */
173            protected int getPort(){
174                    return port;
175            }
176            
177            /**
178             * Associates an I/O handler with this tool bridge.
179             * 
180             * @param ioHandler
181             *            The I/O handler to associate this tool bridge with.
182             */
183            public void setIOHandler(IIOHandler ioHandler){
184                    this.ioHandler = ioHandler;
185            }
186            
187            /**
188             * Checks if the tool associated with this tool bridge supplies the given signature.
189             * 
190             * @param signatures
191             *            The signature we need to compare the interface of the tool too.
192             * @return True if the tool supplies the expected interface; false otherwise.
193             */
194            public abstract boolean checkSignature(ATerm signature);
195            
196            /**
197             * Executes a DO operation with the given ATerm.
198             * 
199             * @param aTerm
200             *            The ATerm that contains the necessary data to complete the DO request.
201             */
202            public abstract void doDo(ATerm aTerm);
203            
204            /**
205             * Executes an EVAL operation with the given ATerm.
206             * 
207             * @param aTerm
208             *            The ATerm that contains the necessary data to complete the EVAL request.
209             * @return The result the EVAL request produced (may not be null).
210             */
211            public abstract ATerm doEval(ATerm aTerm);
212            
213            /**
214             * Executes a RECACKEVENT operation with the given ATerm.
215             * 
216             * @param aTerm
217             *            The ATerm that can potentially contain callback data.
218             */
219            public abstract void doReceiveAckEvent(ATerm aTerm);
220            
221            /**
222             * Executes a TERMINATE operation with the given ATerm.
223             * 
224             * @param aTerm
225             *            The ATerm that can potentially contain background informantion about the termination request.
226             */
227            public abstract void doTerminate(ATerm aTerm);
228            
229            /**
230             * Retrieves performance statistic information from the tool (if possible).
231             * 
232             * @return The gathered performance information (may not be null).
233             */
234            public abstract ATerm doGetPerformanceStats();
235            
236            /**
237             * @see IDataHandler#send(byte, ATerm)
238             */
239            public void send(byte operation, ATerm aTerm){
240                    ioHandler.send(operation, termFactory.importTerm(aTerm));
241            }
242            
243            /**
244             * Posts the given event.
245             * 
246             * @param aTerm
247             *            The event.
248             */
249            public void postEvent(ATerm aTerm){
250                    long threadId = Thread.currentThread().getId();
251                    ThreadLocalJobQueue threadLocalQueue;
252                    synchronized(threadLocalQueues){
253                            threadLocalQueue = threadLocalQueues.get(new Long(threadId));
254                            if(threadLocalQueue == null){
255                                    threadLocalQueue = new ThreadLocalJobQueue();
256                                    threadLocalQueues.put(new Long(threadId), threadLocalQueue);
257                            }
258                    }
259                    threadLocalQueue.postEvent(aTerm, threadId);
260            }
261            
262            /**
263             * Posts the given request.
264             * 
265             * @param aTerm
266             *            The request.
267             */
268            public ATermAppl postRequest(ATerm aTerm){
269                    long threadId = Thread.currentThread().getId();
270                    ThreadLocalJobQueue threadLocalQueue;
271                    synchronized(threadLocalQueues){
272                            threadLocalQueue = threadLocalQueues.get(new Long(threadId));
273                            if(threadLocalQueue == null){
274                                    threadLocalQueue = new ThreadLocalJobQueue();
275                                    threadLocalQueues.put(new Long(threadId), threadLocalQueue);
276                            }
277                    }
278                    Job job = threadLocalQueue.postRequest(aTerm, threadId);
279                    return threadLocalQueue.waitForResponse(job);
280            }
281    
282            /**
283             * @see IDataHandler#receive(byte, ATerm)
284             */
285            public void receive(byte operation, final ATerm aTerm){
286                    switch(operation){
287                            case DO:
288                                    workerQueue.execute(new Runnable(){
289                                            public void run(){
290                                                    doDo(aTerm);
291                                                    send(ACKDO, termFactory.makeList());
292                                            }
293                                    });
294                                    break;
295                            case EVAL:
296                                    workerQueue.execute(new Runnable(){
297                                            public void run(){
298                                                    send(VALUE, doEval(aTerm));
299                                            }
300                                    });
301                                    break;
302                            case ACKEVENT:
303                                    ATermList ackEvent = ((ATermList) aTerm);
304                                    ATerm event = ackEvent.getFirst();
305                                    
306                                    String ackSourceName = ((ATermAppl) event).getAFun().getName();
307                                    
308                                    JobQueue eventQueue;
309                                    synchronized(queues){
310                                            eventQueue = queues.get(ackSourceName);
311                                    }
312                                    if(eventQueue == null){
313                                            LoggerFactory.log("Received acknowledgement for a non-existent event: " + ackSourceName, ILogger.WARNING, IToolBusLoggerConstants.TOOL);
314                                            return;
315                                    }
316                                    eventQueue.ackEvent();
317                                    
318                                    ATerm callBackInfo = ackEvent.elementAt(1);
319                                    doReceiveAckEvent(callBackInfo);
320                                    break;
321                            case RESPONSE:
322                                    ATermAppl response = (ATermAppl) aTerm;
323                                    String responseSourceName = response.getAFun().toString();
324                                    
325                                    JobQueue requestQueue;
326                                    synchronized(queues){
327                                            requestQueue = queues.get(responseSourceName);
328                                    }
329                                    if(requestQueue == null){
330                                            LoggerFactory.log("Received response on a non-existent request: " + responseSourceName, ILogger.WARNING, IToolBusLoggerConstants.TOOL);
331                                            return;
332                                    }
333                                    requestQueue.recResponse(response);
334                                    break;
335                            case TERMINATE:
336                                    workerQueue.execute(new Runnable(){
337                                            public void run(){
338                                                    doTerminate(aTerm);
339                                                    terminate();
340                                            }
341                                    });
342                                    break;
343                            case PERFORMANCESTATS:
344                                    ATerm performanceStats = doGetPerformanceStats();
345                                    send(PERFORMANCESTATS, performanceStats);
346                                    break;
347                            case DEBUGPERFORMANCESTATS:
348                                    ATerm debugPerformanceStats = doGetPerformanceStats();
349                                    send(DEBUGPERFORMANCESTATS, debugPerformanceStats);
350                                    break;
351                            default:
352                                    LoggerFactory.log("Unkown operation id: " + operation, ILogger.ERROR, IToolBusLoggerConstants.TOOL);
353                    }
354            }
355    
356            /**
357             * Requests the termination of this tool, so it can perform an orderly shutdown on the toolbus
358             * side.
359             * 
360             * @see IDataHandler#terminate()
361             */
362            public void terminate(){
363                    ioHandler.terminate();
364                    workerQueue.terminate();
365            }
366    
367            /**
368             * Informs the tool that it was shut down by the toolbus.
369             * 
370             * @see IDataHandler#shutDown()
371             */
372            public void shutDown(){
373                    LoggerFactory.log("The connection with the ToolBus was closed. Waiting for the tool to shut down ....", ILogger.INFO, IToolBusLoggerConstants.TOOL);
374            }
375            
376            /**
377             * @see IDataHandler#exceptionOccured()
378             */
379            public void exceptionOccured(){
380                    LoggerFactory.log("Lost connection with the ToolBus. Initiating ungraceful shutdown ....", ILogger.FATAL, IToolBusLoggerConstants.TOOL);
381                    
382                    System.exit(0);
383            }
384    
385            /**
386             * Executes the ToolBridge.
387             * 
388             * @see Runnable#run()
389             */
390            public void run(){
391                    // Initialize the worker queue.
392                    workerQueue.start();
393                    
394                    // Only start the connection stuff if we haven't been linked to an I/O handler yet.
395                    if(type.equals(AbstractTool.REMOTETOOL)){
396                            if(host == null || port == -1){
397                                    LoggerFactory.log("Dunno where the ToolBus is running.", ILogger.FATAL, IToolBusLoggerConstants.TOOL);
398                                    throw new RuntimeException("Dunno where the ToolBus is running.");
399                            }
400                            
401                            ToolConnectionHandler toolConnectionHandler = new ToolConnectionHandler(this, host, port);
402                            toolConnectionHandler.run();
403                    }else if(type.equals(AbstractTool.DIRECTTOOL)){
404                            DirectConnectionHandler directConnectionHandler = toolbus.getDirectConnectionHandler();
405                            try{
406                                    directConnectionHandler.dock(this);
407                            }catch(ToolBusError tberr){
408                                    throw new RuntimeException(tberr);
409                            }
410                    }else{
411                            String error = "Unknown tool type: " + type;
412                            LoggerFactory.log(error, ILogger.FATAL, IToolBusLoggerConstants.TOOL);
413                            throw new RuntimeException(error);
414                    }
415                    // Send a connect message to the ToolBus.
416                    send(CONNECT, termFactory.makeAppl(termFactory.makeAFun(toolName, 1, false), termFactory.makeInt(toolID)));
417            }
418            
419            /**
420             * A job.
421             * 
422             * @author Arnold Lankamp
423             */
424            private static class Job{
425                    public final byte operation;
426                    public final ATerm term;
427                    public final long threadId;
428                    public ATermAppl response; // Optional field
429                    
430                    /**
431                     * Constructor.
432                     * 
433                     * @param operation
434                     *            The op-code.
435                     * @param term
436                     *            The message associated with this event.
437                     * @param threadId
438                     *            The id of the thread associated with this event.
439                     */
440                    public Job(byte operation, ATerm term, long threadId){
441                            super();
442                            
443                            this.operation = operation;
444                            this.term = term;
445                            this.threadId = threadId;
446                    }
447            }
448    
449            /**
450             * This job queue holds all the jobs that are send from a single source.
451             * 
452             * @author Arnold Lankamp
453             */
454            private class JobQueue{
455                    private final List<Job> jobs;
456                    private Job current;
457    
458                    /**
459                     * Default constructor.
460                     */
461                    public JobQueue(){
462                            super();
463                            
464                            jobs = new LinkedList<Job>();
465                            
466                            current = null;
467                    }
468                    
469                    /**
470                     * Schedules the given job for transmission to the ToolBus. If there are currently no
471                     * jobs in the queue, the event will be send immediately; otherwise we'll need to wait till
472                     * all the requests (of the same source) that where previously scheduled have been send and
473                     * acknowledged.
474                     * 
475                     * @param job
476                     *            A container that hold the details about the request.
477                     */
478                    public synchronized void post(Job job){
479                            if(current == null){
480                                    ioHandler.send(job.operation, job.term);
481                                    current = job;
482                            }else{
483                                    jobs.add(job);
484                            }
485                    }
486    
487                    /**
488                     * Returns the next job in the queue.
489                     * 
490                     * @return The next job in the queue; null if the queue is empty.
491                     */
492                    public synchronized Job getNext(){
493                            if(!jobs.isEmpty()) return jobs.remove(0);
494                            
495                            return null;
496                    }
497                    
498                    /**
499                     * Notifies the thread local queue of the acknowledgement and executes the next queued job
500                     * (if present).
501                     */
502                    private void acknowledge(){
503                            long threadId = current.threadId;
504                            ThreadLocalJobQueue threadLocalQueue;
505                            synchronized(threadLocalQueues){
506                                    threadLocalQueue = threadLocalQueues.get(new Long(threadId));
507                            }
508                            threadLocalQueue.acknowledge();
509                            
510                            Job next = getNext();
511                            current = next;
512                            if(next != null){
513                                    ioHandler.send(next.operation, next.term);
514                            }
515                    }
516    
517                    /**
518                     * Acknowledges the last event that was send from the source this queue is associated with.
519                     * It will execute the next job in the queue if there are any.
520                     */
521                    public synchronized void ackEvent(){
522                            acknowledge();
523                    }
524                    
525                    /**
526                     * Acknowledges the last request that was send from the source this queue is associated
527                     * with. It will execute the next job in the queue if there are any.
528                     * 
529                     * @param response
530                     *            The response.
531                     */
532                    public synchronized void recResponse(ATermAppl response){
533                            synchronized(current){
534                                    current.response = response;
535                                    current.notify();
536                            }
537                            
538                            acknowledge();
539                    }
540            }
541            
542            /**
543             * This job queue holds all the jobs that are posted by a certain thread.
544             * 
545             * @author Arnold Lankamp
546             */
547            private class ThreadLocalJobQueue{
548                    private final List<Job> requests;
549                    
550                    private boolean awaitingAck;
551    
552                    /**
553                     * Default constructor.
554                     */
555                    public ThreadLocalJobQueue(){
556                            super();
557                            
558                            requests = new LinkedList<Job>();
559                    }
560                    
561                    /**
562                     * Schedules the given event for transmission to the ToolBus. If there are currently no
563                     * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
564                     * to wait till all the jobs (associated with the current thread) that were previously
565                     * scheduled have been submitted to the request queue.
566                     * 
567                     * @param aTerm
568                     *            The term that hold the details about the event.
569                     * @param threadId
570                     *            The id of the thread associated with the event.
571                     */
572                    public synchronized void postEvent(ATerm aTerm, long threadId){
573                            Job request = new Job(EVENT, aTerm, threadId);
574                            
575                            if(!awaitingAck){
576                                    String sourceName = ((ATermAppl) aTerm).getAFun().getName();
577                                    
578                                    JobQueue requestQueue;
579                                    synchronized(queues){
580                                            requestQueue = queues.get(sourceName);
581                                            if(requestQueue == null){
582                                                    requestQueue = new JobQueue();
583                                                    queues.put(sourceName, requestQueue);
584                                            }
585                                    }
586                                    requestQueue.post(request);
587                                    awaitingAck = true;
588                            }else{
589                                    requests.add(request);
590                            }
591                    }
592                    
593                    /**
594                     * Schedules the given request for transmission to the ToolBus. If there are currently no
595                     * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
596                     * to wait till all the jobs (associated with the current thread) that were previously
597                     * scheduled have been submitted to the request queue.
598                     * 
599                     * @param aTerm
600                     *            The term that hold the details about the request.
601                     * @param threadId
602                     *            The id of the thread associated with the request.
603                     * @return The received response on the issued request.
604                     */
605                    public synchronized Job postRequest(ATerm aTerm, long threadId){
606                            Job job = new Job(REQUEST, aTerm, threadId);
607                            if(!awaitingAck){
608                                    String sourceName = ((ATermAppl) aTerm).getAFun().getName();
609                                    
610                                    JobQueue requestQueue;
611                                    synchronized(queues){
612                                            requestQueue = queues.get(sourceName);
613                                            if(requestQueue == null){
614                                                    requestQueue = new JobQueue();
615                                                    queues.put(sourceName, requestQueue);
616                                            }
617                                    }
618                                    requestQueue.post(job);
619                                    awaitingAck = true;
620                            }else{
621                                    requests.add(job);
622                                    }
623                            return job;
624                    }
625                    
626                    public ATermAppl waitForResponse(Job job){
627                            synchronized(job){
628                                    while(job.response == null){
629                                            try{
630                                                    job.wait();
631                                            }catch(InterruptedException irex){
632                                                    // Ignore this, since I don't want to know about it.
633                                            }
634                                    }
635                            }
636                            return job.response;
637                    }
638                    
639                    /**
640                     * Returns the next job in the queue.
641                     * 
642                     * @return The next job in the queue; null if the queue is empty.
643                     */
644                    public synchronized Job getNext(){
645                            if(!requests.isEmpty()) return requests.remove(0);
646                            
647                            return null;
648                    }
649                    
650                    /**
651                     * Acknowledges the last job that was send from the source the current thread is associated
652                     * with. It will submit the next job in the queue if there are any.
653                     */
654                    public synchronized void acknowledge(){
655                            Job next = getNext();
656                            if(next == null){
657                                    awaitingAck = false;
658                            }else{
659                                    ATerm term = next.term;
660                                    String sourceName = ((ATermAppl) term).getAFun().getName();
661                                    
662                                    JobQueue requestQueue;
663                                    synchronized(queues){
664                                            requestQueue = queues.get(sourceName);
665                                            if(requestQueue == null){
666                                                    requestQueue = new JobQueue();
667                                                    queues.put(sourceName, requestQueue);
668                                            }
669                                    }
670                                    
671                                    requestQueue.post(next);
672                            }
673                    }
674            }
675            
676            /**
677             * The queue that is meant to take care of the asynchroneous execution and queueing of anything
678             * that invokes stuff on a tool.
679             * 
680             * @author Arnold Lankamp
681             */
682            private static class WorkerQueue{
683                    private final Object lock = new Object();
684                    
685                    private final RotatingQueue<Runnable> queue;
686                    private final Worker worker;
687                    
688                    /**
689                     * Default constructor.
690                     */
691                    public WorkerQueue(){
692                            super();
693                            
694                            ThreadGroup toolGroup = new ThreadGroup("ToolGroup");
695                            queue = new RotatingQueue<Runnable>();
696                            worker = new Worker(toolGroup);
697                            
698                            worker.setDaemon(true);
699                    }
700                    
701                    /**
702                     * Starts the worker thread.
703                     */
704                    public void start(){
705                            worker.start();
706                    }
707                    
708                    /**
709                     * Executes or queues the given runnable for execution.
710                     * 
711                     * @param r
712                     *            The runnable containing the stuff that needs to be executed.
713                     */
714                    public void execute(Runnable r){
715                            synchronized(lock){
716                                    boolean notBusy = queue.isEmpty();
717                                    queue.put(r);
718                                    if(notBusy) lock.notify();
719                            }
720                    }
721                    
722                    /**
723                     * Terminates the worker thread as soon as is gracefully possible.
724                     */
725                    public void terminate(){
726                            worker.terminate();
727                    }
728                    
729                    /**
730                     * The worker thread of this queue.
731                     * 
732                     * @author Arnold Lankamp
733                     */
734                    private class Worker extends Thread{
735                            private volatile boolean running;
736                            
737                            /**
738                             * Default constructor.
739                             * @param threadGroup
740                             *            The thread group to join.
741                             */
742                            public Worker(ThreadGroup threadGroup){
743                                    super(threadGroup, "Worker");
744                                    
745                                    running = true;
746                            }
747                            
748                            /**
749                             * The main execution loop.
750                             */
751                            public void run(){
752                                    do{
753                                            Runnable r;
754                                            synchronized(lock){
755                                                    r = queue.get();
756                                                    while(r == null){
757                                                            try{
758                                                                    lock.wait();
759                                                            }catch(InterruptedException irex){
760                                                                    // Ignore this.
761                                                            }
762                                                            if(!running) return;
763                                                            
764                                                            r = queue.get();
765                                                    }
766                                            }
767                                            
768                                            r.run();
769                                    }while(running);
770                            }
771                            
772                            /**
773                             * Terminates this worker thread after the current iteration.
774                             */
775                            public void terminate(){
776                                    running = false;
777                                    synchronized(lock){
778                                            lock.notify();
779                                    }
780                            }
781                    }
782            }
783    }