001 package toolbus.adapter;
002
003 import java.net.InetAddress;
004 import java.util.HashMap;
005 import java.util.LinkedList;
006 import java.util.List;
007 import java.util.Map;
008
009 import toolbus.DirectConnectionHandler;
010 import toolbus.IOperations;
011 import toolbus.ToolBus;
012 import toolbus.communication.IDataHandler;
013 import toolbus.communication.IIOHandler;
014 import toolbus.exceptions.ToolBusError;
015 import toolbus.logging.ILogger;
016 import toolbus.logging.IToolBusLoggerConstants;
017 import toolbus.logging.LoggerFactory;
018 import toolbus.util.collections.RotatingQueue;
019 import aterm.ATerm;
020 import aterm.ATermAppl;
021 import aterm.ATermList;
022 import aterm.pure.PureFactory;
023
024 /**
025 * This class supplies an interface for the tool towards the ToolBus and handles the invokation of
026 * methods on the tool.
027 *
028 * @author Arnold Lankamp
029 */
030 public abstract class ToolBridge implements IDataHandler, Runnable, IOperations{
031 private final PureFactory termFactory;
032
033 private final Map<Long, ThreadLocalJobQueue> threadLocalQueues;
034 private final Map<String, JobQueue> queues;
035
036 private final WorkerQueue workerQueue;
037
038 private volatile IIOHandler ioHandler;
039
040 private final String type;
041
042 private final String toolName;
043 private int toolID = -1;
044
045 private final InetAddress host;
046 private final int port;
047
048 private final ToolBus toolbus;
049
050 /**
051 * Constructor.
052 *
053 * @param termFactory
054 * The term factory to use.
055 * @param toolName
056 * The name of the with this bridge associated tool.
057 * @param toolID
058 * The id of the with this bridge associated tool.
059 * @param host
060 * The host on which the ToolBus is running.
061 * @param port
062 * The port on which the ToolBus is running.
063 */
064 public ToolBridge(PureFactory termFactory, String toolName, int toolID, InetAddress host, int port){
065 super();
066
067 this.termFactory = termFactory;
068 this.type = AbstractTool.REMOTETOOL;
069 this.toolName = toolName;
070 this.toolID = toolID;
071 this.host = host;
072 this.port = port;
073
074 this.toolbus = null;
075
076 threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>();
077 queues = new HashMap<String, JobQueue>();
078
079 workerQueue = new WorkerQueue();
080 }
081
082 /**
083 * Constructor.
084 *
085 * @param termFactory
086 * The term factory to use.
087 * @param toolName
088 * The name of the with this bridge associated tool.
089 * @param toolID
090 * The id of the with this bridge associated tool.
091 * @param toolbus
092 * The toolbus to connect to.
093 */
094 public ToolBridge(PureFactory termFactory, String toolName, int toolID, ToolBus toolbus){
095 super();
096
097 this.termFactory = termFactory;
098 this.type = AbstractTool.DIRECTTOOL;
099 this.toolName = toolName;
100 this.toolID = toolID;
101 this.toolbus = toolbus;
102
103 this.host = null;
104 this.port = -1;
105
106 threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>();
107 queues = new HashMap<String, JobQueue>();
108
109 workerQueue = new WorkerQueue();
110 }
111
112 /**
113 * Returns what type of connection we have with the ToolBus.
114 *
115 * @return What type of connection we have with the ToolBus.
116 */
117 public String getType(){
118 return type;
119 }
120
121 /**
122 * Returns the tool name of the tool that is associated with this tool bridge.
123 *
124 * @return The tool name of the tool that is associated with this tool bridge.
125 */
126 public String getToolName(){
127 return toolName;
128 }
129
130 /**
131 * Sets the tool id to the given values. This method will only be called once, after
132 * successfully connecting to the ToolBus.
133 *
134 * @param toolID
135 * The value the toolID has to be set too.
136 */
137 protected void setToolID(int toolID){
138 this.toolID = toolID;
139 }
140
141 /**
142 * Returns the tool id of the tool that is associated with this tool bridge.
143 *
144 * @return The tool id of the tool that is associated with this tool bridge.
145 */
146 public int getToolID(){
147 return toolID;
148 }
149
150 /**
151 * Returns a reference to the term factory.
152 *
153 * @return A reference to the term factory.
154 */
155 public PureFactory getFactory(){
156 return termFactory;
157 }
158
159 /**
160 * Returns the adress of the host the ToolBus is running on.
161 *
162 * @return The adress of the host the ToolBus is running on. Is null when uninitialized.
163 */
164 protected InetAddress getHost(){
165 return host;
166 }
167
168 /**
169 * Returns the port number the ToolBus is running on.
170 *
171 * @return The port number the ToolBus is running on. Is -1 when uninitialized.
172 */
173 protected int getPort(){
174 return port;
175 }
176
177 /**
178 * Associates an I/O handler with this tool bridge.
179 *
180 * @param ioHandler
181 * The I/O handler to associate this tool bridge with.
182 */
183 public void setIOHandler(IIOHandler ioHandler){
184 this.ioHandler = ioHandler;
185 }
186
187 /**
188 * Checks if the tool associated with this tool bridge supplies the given signature.
189 *
190 * @param signatures
191 * The signature we need to compare the interface of the tool too.
192 * @return True if the tool supplies the expected interface; false otherwise.
193 */
194 public abstract boolean checkSignature(ATerm signature);
195
196 /**
197 * Executes a DO operation with the given ATerm.
198 *
199 * @param aTerm
200 * The ATerm that contains the necessary data to complete the DO request.
201 */
202 public abstract void doDo(ATerm aTerm);
203
204 /**
205 * Executes an EVAL operation with the given ATerm.
206 *
207 * @param aTerm
208 * The ATerm that contains the necessary data to complete the EVAL request.
209 * @return The result the EVAL request produced (may not be null).
210 */
211 public abstract ATerm doEval(ATerm aTerm);
212
213 /**
214 * Executes a RECACKEVENT operation with the given ATerm.
215 *
216 * @param aTerm
217 * The ATerm that can potentially contain callback data.
218 */
219 public abstract void doReceiveAckEvent(ATerm aTerm);
220
221 /**
222 * Executes a TERMINATE operation with the given ATerm.
223 *
224 * @param aTerm
225 * The ATerm that can potentially contain background informantion about the termination request.
226 */
227 public abstract void doTerminate(ATerm aTerm);
228
229 /**
230 * Retrieves performance statistic information from the tool (if possible).
231 *
232 * @return The gathered performance information (may not be null).
233 */
234 public abstract ATerm doGetPerformanceStats();
235
236 /**
237 * @see IDataHandler#send(byte, ATerm)
238 */
239 public void send(byte operation, ATerm aTerm){
240 ioHandler.send(operation, termFactory.importTerm(aTerm));
241 }
242
243 /**
244 * Posts the given event.
245 *
246 * @param aTerm
247 * The event.
248 */
249 public void postEvent(ATerm aTerm){
250 long threadId = Thread.currentThread().getId();
251 ThreadLocalJobQueue threadLocalQueue;
252 synchronized(threadLocalQueues){
253 threadLocalQueue = threadLocalQueues.get(new Long(threadId));
254 if(threadLocalQueue == null){
255 threadLocalQueue = new ThreadLocalJobQueue();
256 threadLocalQueues.put(new Long(threadId), threadLocalQueue);
257 }
258 }
259 threadLocalQueue.postEvent(aTerm, threadId);
260 }
261
262 /**
263 * Posts the given request.
264 *
265 * @param aTerm
266 * The request.
267 */
268 public ATermAppl postRequest(ATerm aTerm){
269 long threadId = Thread.currentThread().getId();
270 ThreadLocalJobQueue threadLocalQueue;
271 synchronized(threadLocalQueues){
272 threadLocalQueue = threadLocalQueues.get(new Long(threadId));
273 if(threadLocalQueue == null){
274 threadLocalQueue = new ThreadLocalJobQueue();
275 threadLocalQueues.put(new Long(threadId), threadLocalQueue);
276 }
277 }
278 Job job = threadLocalQueue.postRequest(aTerm, threadId);
279 return threadLocalQueue.waitForResponse(job);
280 }
281
282 /**
283 * @see IDataHandler#receive(byte, ATerm)
284 */
285 public void receive(byte operation, final ATerm aTerm){
286 switch(operation){
287 case DO:
288 workerQueue.execute(new Runnable(){
289 public void run(){
290 doDo(aTerm);
291 send(ACKDO, termFactory.makeList());
292 }
293 });
294 break;
295 case EVAL:
296 workerQueue.execute(new Runnable(){
297 public void run(){
298 send(VALUE, doEval(aTerm));
299 }
300 });
301 break;
302 case ACKEVENT:
303 ATermList ackEvent = ((ATermList) aTerm);
304 ATerm event = ackEvent.getFirst();
305
306 String ackSourceName = ((ATermAppl) event).getAFun().getName();
307
308 JobQueue eventQueue;
309 synchronized(queues){
310 eventQueue = queues.get(ackSourceName);
311 }
312 if(eventQueue == null){
313 LoggerFactory.log("Received acknowledgement for a non-existent event: " + ackSourceName, ILogger.WARNING, IToolBusLoggerConstants.TOOL);
314 return;
315 }
316 eventQueue.ackEvent();
317
318 ATerm callBackInfo = ackEvent.elementAt(1);
319 doReceiveAckEvent(callBackInfo);
320 break;
321 case RESPONSE:
322 ATermAppl response = (ATermAppl) aTerm;
323 String responseSourceName = response.getAFun().toString();
324
325 JobQueue requestQueue;
326 synchronized(queues){
327 requestQueue = queues.get(responseSourceName);
328 }
329 if(requestQueue == null){
330 LoggerFactory.log("Received response on a non-existent request: " + responseSourceName, ILogger.WARNING, IToolBusLoggerConstants.TOOL);
331 return;
332 }
333 requestQueue.recResponse(response);
334 break;
335 case TERMINATE:
336 workerQueue.execute(new Runnable(){
337 public void run(){
338 doTerminate(aTerm);
339 terminate();
340 }
341 });
342 break;
343 case PERFORMANCESTATS:
344 ATerm performanceStats = doGetPerformanceStats();
345 send(PERFORMANCESTATS, performanceStats);
346 break;
347 case DEBUGPERFORMANCESTATS:
348 ATerm debugPerformanceStats = doGetPerformanceStats();
349 send(DEBUGPERFORMANCESTATS, debugPerformanceStats);
350 break;
351 default:
352 LoggerFactory.log("Unkown operation id: " + operation, ILogger.ERROR, IToolBusLoggerConstants.TOOL);
353 }
354 }
355
356 /**
357 * Requests the termination of this tool, so it can perform an orderly shutdown on the toolbus
358 * side.
359 *
360 * @see IDataHandler#terminate()
361 */
362 public void terminate(){
363 ioHandler.terminate();
364 workerQueue.terminate();
365 }
366
367 /**
368 * Informs the tool that it was shut down by the toolbus.
369 *
370 * @see IDataHandler#shutDown()
371 */
372 public void shutDown(){
373 LoggerFactory.log("The connection with the ToolBus was closed. Waiting for the tool to shut down ....", ILogger.INFO, IToolBusLoggerConstants.TOOL);
374 }
375
376 /**
377 * @see IDataHandler#exceptionOccured()
378 */
379 public void exceptionOccured(){
380 LoggerFactory.log("Lost connection with the ToolBus. Initiating ungraceful shutdown ....", ILogger.FATAL, IToolBusLoggerConstants.TOOL);
381
382 System.exit(0);
383 }
384
385 /**
386 * Executes the ToolBridge.
387 *
388 * @see Runnable#run()
389 */
390 public void run(){
391 // Initialize the worker queue.
392 workerQueue.start();
393
394 // Only start the connection stuff if we haven't been linked to an I/O handler yet.
395 if(type.equals(AbstractTool.REMOTETOOL)){
396 if(host == null || port == -1){
397 LoggerFactory.log("Dunno where the ToolBus is running.", ILogger.FATAL, IToolBusLoggerConstants.TOOL);
398 throw new RuntimeException("Dunno where the ToolBus is running.");
399 }
400
401 ToolConnectionHandler toolConnectionHandler = new ToolConnectionHandler(this, host, port);
402 toolConnectionHandler.run();
403 }else if(type.equals(AbstractTool.DIRECTTOOL)){
404 DirectConnectionHandler directConnectionHandler = toolbus.getDirectConnectionHandler();
405 try{
406 directConnectionHandler.dock(this);
407 }catch(ToolBusError tberr){
408 throw new RuntimeException(tberr);
409 }
410 }else{
411 String error = "Unknown tool type: " + type;
412 LoggerFactory.log(error, ILogger.FATAL, IToolBusLoggerConstants.TOOL);
413 throw new RuntimeException(error);
414 }
415 // Send a connect message to the ToolBus.
416 send(CONNECT, termFactory.makeAppl(termFactory.makeAFun(toolName, 1, false), termFactory.makeInt(toolID)));
417 }
418
419 /**
420 * A job.
421 *
422 * @author Arnold Lankamp
423 */
424 private static class Job{
425 public final byte operation;
426 public final ATerm term;
427 public final long threadId;
428 public ATermAppl response; // Optional field
429
430 /**
431 * Constructor.
432 *
433 * @param operation
434 * The op-code.
435 * @param term
436 * The message associated with this event.
437 * @param threadId
438 * The id of the thread associated with this event.
439 */
440 public Job(byte operation, ATerm term, long threadId){
441 super();
442
443 this.operation = operation;
444 this.term = term;
445 this.threadId = threadId;
446 }
447 }
448
449 /**
450 * This job queue holds all the jobs that are send from a single source.
451 *
452 * @author Arnold Lankamp
453 */
454 private class JobQueue{
455 private final List<Job> jobs;
456 private Job current;
457
458 /**
459 * Default constructor.
460 */
461 public JobQueue(){
462 super();
463
464 jobs = new LinkedList<Job>();
465
466 current = null;
467 }
468
469 /**
470 * Schedules the given job for transmission to the ToolBus. If there are currently no
471 * jobs in the queue, the event will be send immediately; otherwise we'll need to wait till
472 * all the requests (of the same source) that where previously scheduled have been send and
473 * acknowledged.
474 *
475 * @param job
476 * A container that hold the details about the request.
477 */
478 public synchronized void post(Job job){
479 if(current == null){
480 ioHandler.send(job.operation, job.term);
481 current = job;
482 }else{
483 jobs.add(job);
484 }
485 }
486
487 /**
488 * Returns the next job in the queue.
489 *
490 * @return The next job in the queue; null if the queue is empty.
491 */
492 public synchronized Job getNext(){
493 if(!jobs.isEmpty()) return jobs.remove(0);
494
495 return null;
496 }
497
498 /**
499 * Notifies the thread local queue of the acknowledgement and executes the next queued job
500 * (if present).
501 */
502 private void acknowledge(){
503 long threadId = current.threadId;
504 ThreadLocalJobQueue threadLocalQueue;
505 synchronized(threadLocalQueues){
506 threadLocalQueue = threadLocalQueues.get(new Long(threadId));
507 }
508 threadLocalQueue.acknowledge();
509
510 Job next = getNext();
511 current = next;
512 if(next != null){
513 ioHandler.send(next.operation, next.term);
514 }
515 }
516
517 /**
518 * Acknowledges the last event that was send from the source this queue is associated with.
519 * It will execute the next job in the queue if there are any.
520 */
521 public synchronized void ackEvent(){
522 acknowledge();
523 }
524
525 /**
526 * Acknowledges the last request that was send from the source this queue is associated
527 * with. It will execute the next job in the queue if there are any.
528 *
529 * @param response
530 * The response.
531 */
532 public synchronized void recResponse(ATermAppl response){
533 synchronized(current){
534 current.response = response;
535 current.notify();
536 }
537
538 acknowledge();
539 }
540 }
541
542 /**
543 * This job queue holds all the jobs that are posted by a certain thread.
544 *
545 * @author Arnold Lankamp
546 */
547 private class ThreadLocalJobQueue{
548 private final List<Job> requests;
549
550 private boolean awaitingAck;
551
552 /**
553 * Default constructor.
554 */
555 public ThreadLocalJobQueue(){
556 super();
557
558 requests = new LinkedList<Job>();
559 }
560
561 /**
562 * Schedules the given event for transmission to the ToolBus. If there are currently no
563 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
564 * to wait till all the jobs (associated with the current thread) that were previously
565 * scheduled have been submitted to the request queue.
566 *
567 * @param aTerm
568 * The term that hold the details about the event.
569 * @param threadId
570 * The id of the thread associated with the event.
571 */
572 public synchronized void postEvent(ATerm aTerm, long threadId){
573 Job request = new Job(EVENT, aTerm, threadId);
574
575 if(!awaitingAck){
576 String sourceName = ((ATermAppl) aTerm).getAFun().getName();
577
578 JobQueue requestQueue;
579 synchronized(queues){
580 requestQueue = queues.get(sourceName);
581 if(requestQueue == null){
582 requestQueue = new JobQueue();
583 queues.put(sourceName, requestQueue);
584 }
585 }
586 requestQueue.post(request);
587 awaitingAck = true;
588 }else{
589 requests.add(request);
590 }
591 }
592
593 /**
594 * Schedules the given request for transmission to the ToolBus. If there are currently no
595 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
596 * to wait till all the jobs (associated with the current thread) that were previously
597 * scheduled have been submitted to the request queue.
598 *
599 * @param aTerm
600 * The term that hold the details about the request.
601 * @param threadId
602 * The id of the thread associated with the request.
603 * @return The received response on the issued request.
604 */
605 public synchronized Job postRequest(ATerm aTerm, long threadId){
606 Job job = new Job(REQUEST, aTerm, threadId);
607 if(!awaitingAck){
608 String sourceName = ((ATermAppl) aTerm).getAFun().getName();
609
610 JobQueue requestQueue;
611 synchronized(queues){
612 requestQueue = queues.get(sourceName);
613 if(requestQueue == null){
614 requestQueue = new JobQueue();
615 queues.put(sourceName, requestQueue);
616 }
617 }
618 requestQueue.post(job);
619 awaitingAck = true;
620 }else{
621 requests.add(job);
622 }
623 return job;
624 }
625
626 public ATermAppl waitForResponse(Job job){
627 synchronized(job){
628 while(job.response == null){
629 try{
630 job.wait();
631 }catch(InterruptedException irex){
632 // Ignore this, since I don't want to know about it.
633 }
634 }
635 }
636 return job.response;
637 }
638
639 /**
640 * Returns the next job in the queue.
641 *
642 * @return The next job in the queue; null if the queue is empty.
643 */
644 public synchronized Job getNext(){
645 if(!requests.isEmpty()) return requests.remove(0);
646
647 return null;
648 }
649
650 /**
651 * Acknowledges the last job that was send from the source the current thread is associated
652 * with. It will submit the next job in the queue if there are any.
653 */
654 public synchronized void acknowledge(){
655 Job next = getNext();
656 if(next == null){
657 awaitingAck = false;
658 }else{
659 ATerm term = next.term;
660 String sourceName = ((ATermAppl) term).getAFun().getName();
661
662 JobQueue requestQueue;
663 synchronized(queues){
664 requestQueue = queues.get(sourceName);
665 if(requestQueue == null){
666 requestQueue = new JobQueue();
667 queues.put(sourceName, requestQueue);
668 }
669 }
670
671 requestQueue.post(next);
672 }
673 }
674 }
675
676 /**
677 * The queue that is meant to take care of the asynchroneous execution and queueing of anything
678 * that invokes stuff on a tool.
679 *
680 * @author Arnold Lankamp
681 */
682 private static class WorkerQueue{
683 private final Object lock = new Object();
684
685 private final RotatingQueue<Runnable> queue;
686 private final Worker worker;
687
688 /**
689 * Default constructor.
690 */
691 public WorkerQueue(){
692 super();
693
694 ThreadGroup toolGroup = new ThreadGroup("ToolGroup");
695 queue = new RotatingQueue<Runnable>();
696 worker = new Worker(toolGroup);
697
698 worker.setDaemon(true);
699 }
700
701 /**
702 * Starts the worker thread.
703 */
704 public void start(){
705 worker.start();
706 }
707
708 /**
709 * Executes or queues the given runnable for execution.
710 *
711 * @param r
712 * The runnable containing the stuff that needs to be executed.
713 */
714 public void execute(Runnable r){
715 synchronized(lock){
716 boolean notBusy = queue.isEmpty();
717 queue.put(r);
718 if(notBusy) lock.notify();
719 }
720 }
721
722 /**
723 * Terminates the worker thread as soon as is gracefully possible.
724 */
725 public void terminate(){
726 worker.terminate();
727 }
728
729 /**
730 * The worker thread of this queue.
731 *
732 * @author Arnold Lankamp
733 */
734 private class Worker extends Thread{
735 private volatile boolean running;
736
737 /**
738 * Default constructor.
739 * @param threadGroup
740 * The thread group to join.
741 */
742 public Worker(ThreadGroup threadGroup){
743 super(threadGroup, "Worker");
744
745 running = true;
746 }
747
748 /**
749 * The main execution loop.
750 */
751 public void run(){
752 do{
753 Runnable r;
754 synchronized(lock){
755 r = queue.get();
756 while(r == null){
757 try{
758 lock.wait();
759 }catch(InterruptedException irex){
760 // Ignore this.
761 }
762 if(!running) return;
763
764 r = queue.get();
765 }
766 }
767
768 r.run();
769 }while(running);
770 }
771
772 /**
773 * Terminates this worker thread after the current iteration.
774 */
775 public void terminate(){
776 running = false;
777 synchronized(lock){
778 lock.notify();
779 }
780 }
781 }
782 }
783 }