001 package toolbus.adapter; 002 003 import java.net.InetAddress; 004 import java.util.HashMap; 005 import java.util.LinkedList; 006 import java.util.List; 007 import java.util.Map; 008 009 import toolbus.DirectConnectionHandler; 010 import toolbus.IOperations; 011 import toolbus.ToolBus; 012 import toolbus.communication.IDataHandler; 013 import toolbus.communication.IIOHandler; 014 import toolbus.exceptions.ToolBusError; 015 import toolbus.logging.ILogger; 016 import toolbus.logging.IToolBusLoggerConstants; 017 import toolbus.logging.LoggerFactory; 018 import toolbus.util.collections.RotatingQueue; 019 import aterm.ATerm; 020 import aterm.ATermAppl; 021 import aterm.ATermList; 022 import aterm.pure.PureFactory; 023 024 /** 025 * This class supplies an interface for the tool towards the ToolBus and handles the invokation of 026 * methods on the tool. 027 * 028 * @author Arnold Lankamp 029 */ 030 public abstract class ToolBridge implements IDataHandler, Runnable, IOperations{ 031 private final PureFactory termFactory; 032 033 private final Map<Long, ThreadLocalJobQueue> threadLocalQueues; 034 private final Map<String, JobQueue> queues; 035 036 private final WorkerQueue workerQueue; 037 038 private volatile IIOHandler ioHandler; 039 040 private final String type; 041 042 private final String toolName; 043 private int toolID = -1; 044 045 private final InetAddress host; 046 private final int port; 047 048 private final ToolBus toolbus; 049 050 /** 051 * Constructor. 052 * 053 * @param termFactory 054 * The term factory to use. 055 * @param toolName 056 * The name of the with this bridge associated tool. 057 * @param toolID 058 * The id of the with this bridge associated tool. 059 * @param host 060 * The host on which the ToolBus is running. 061 * @param port 062 * The port on which the ToolBus is running. 063 */ 064 public ToolBridge(PureFactory termFactory, String toolName, int toolID, InetAddress host, int port){ 065 super(); 066 067 this.termFactory = termFactory; 068 this.type = AbstractTool.REMOTETOOL; 069 this.toolName = toolName; 070 this.toolID = toolID; 071 this.host = host; 072 this.port = port; 073 074 this.toolbus = null; 075 076 threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>(); 077 queues = new HashMap<String, JobQueue>(); 078 079 workerQueue = new WorkerQueue(); 080 } 081 082 /** 083 * Constructor. 084 * 085 * @param termFactory 086 * The term factory to use. 087 * @param toolName 088 * The name of the with this bridge associated tool. 089 * @param toolID 090 * The id of the with this bridge associated tool. 091 * @param toolbus 092 * The toolbus to connect to. 093 */ 094 public ToolBridge(PureFactory termFactory, String toolName, int toolID, ToolBus toolbus){ 095 super(); 096 097 this.termFactory = termFactory; 098 this.type = AbstractTool.DIRECTTOOL; 099 this.toolName = toolName; 100 this.toolID = toolID; 101 this.toolbus = toolbus; 102 103 this.host = null; 104 this.port = -1; 105 106 threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>(); 107 queues = new HashMap<String, JobQueue>(); 108 109 workerQueue = new WorkerQueue(); 110 } 111 112 /** 113 * Returns what type of connection we have with the ToolBus. 114 * 115 * @return What type of connection we have with the ToolBus. 116 */ 117 public String getType(){ 118 return type; 119 } 120 121 /** 122 * Returns the tool name of the tool that is associated with this tool bridge. 123 * 124 * @return The tool name of the tool that is associated with this tool bridge. 125 */ 126 public String getToolName(){ 127 return toolName; 128 } 129 130 /** 131 * Sets the tool id to the given values. This method will only be called once, after 132 * successfully connecting to the ToolBus. 133 * 134 * @param toolID 135 * The value the toolID has to be set too. 136 */ 137 protected void setToolID(int toolID){ 138 this.toolID = toolID; 139 } 140 141 /** 142 * Returns the tool id of the tool that is associated with this tool bridge. 143 * 144 * @return The tool id of the tool that is associated with this tool bridge. 145 */ 146 public int getToolID(){ 147 return toolID; 148 } 149 150 /** 151 * Returns a reference to the term factory. 152 * 153 * @return A reference to the term factory. 154 */ 155 public PureFactory getFactory(){ 156 return termFactory; 157 } 158 159 /** 160 * Returns the adress of the host the ToolBus is running on. 161 * 162 * @return The adress of the host the ToolBus is running on. Is null when uninitialized. 163 */ 164 protected InetAddress getHost(){ 165 return host; 166 } 167 168 /** 169 * Returns the port number the ToolBus is running on. 170 * 171 * @return The port number the ToolBus is running on. Is -1 when uninitialized. 172 */ 173 protected int getPort(){ 174 return port; 175 } 176 177 /** 178 * Associates an I/O handler with this tool bridge. 179 * 180 * @param ioHandler 181 * The I/O handler to associate this tool bridge with. 182 */ 183 public void setIOHandler(IIOHandler ioHandler){ 184 this.ioHandler = ioHandler; 185 } 186 187 /** 188 * Checks if the tool associated with this tool bridge supplies the given signature. 189 * 190 * @param signatures 191 * The signature we need to compare the interface of the tool too. 192 * @return True if the tool supplies the expected interface; false otherwise. 193 */ 194 public abstract boolean checkSignature(ATerm signature); 195 196 /** 197 * Executes a DO operation with the given ATerm. 198 * 199 * @param aTerm 200 * The ATerm that contains the necessary data to complete the DO request. 201 */ 202 public abstract void doDo(ATerm aTerm); 203 204 /** 205 * Executes an EVAL operation with the given ATerm. 206 * 207 * @param aTerm 208 * The ATerm that contains the necessary data to complete the EVAL request. 209 * @return The result the EVAL request produced (may not be null). 210 */ 211 public abstract ATerm doEval(ATerm aTerm); 212 213 /** 214 * Executes a RECACKEVENT operation with the given ATerm. 215 * 216 * @param aTerm 217 * The ATerm that can potentially contain callback data. 218 */ 219 public abstract void doReceiveAckEvent(ATerm aTerm); 220 221 /** 222 * Executes a TERMINATE operation with the given ATerm. 223 * 224 * @param aTerm 225 * The ATerm that can potentially contain background informantion about the termination request. 226 */ 227 public abstract void doTerminate(ATerm aTerm); 228 229 /** 230 * Retrieves performance statistic information from the tool (if possible). 231 * 232 * @return The gathered performance information (may not be null). 233 */ 234 public abstract ATerm doGetPerformanceStats(); 235 236 /** 237 * @see IDataHandler#send(byte, ATerm) 238 */ 239 public void send(byte operation, ATerm aTerm){ 240 ioHandler.send(operation, termFactory.importTerm(aTerm)); 241 } 242 243 /** 244 * Posts the given event. 245 * 246 * @param aTerm 247 * The event. 248 */ 249 public void postEvent(ATerm aTerm){ 250 long threadId = Thread.currentThread().getId(); 251 ThreadLocalJobQueue threadLocalQueue; 252 synchronized(threadLocalQueues){ 253 threadLocalQueue = threadLocalQueues.get(new Long(threadId)); 254 if(threadLocalQueue == null){ 255 threadLocalQueue = new ThreadLocalJobQueue(); 256 threadLocalQueues.put(new Long(threadId), threadLocalQueue); 257 } 258 } 259 threadLocalQueue.postEvent(aTerm, threadId); 260 } 261 262 /** 263 * Posts the given request. 264 * 265 * @param aTerm 266 * The request. 267 */ 268 public ATermAppl postRequest(ATerm aTerm){ 269 long threadId = Thread.currentThread().getId(); 270 ThreadLocalJobQueue threadLocalQueue; 271 synchronized(threadLocalQueues){ 272 threadLocalQueue = threadLocalQueues.get(new Long(threadId)); 273 if(threadLocalQueue == null){ 274 threadLocalQueue = new ThreadLocalJobQueue(); 275 threadLocalQueues.put(new Long(threadId), threadLocalQueue); 276 } 277 } 278 Job job = threadLocalQueue.postRequest(aTerm, threadId); 279 return threadLocalQueue.waitForResponse(job); 280 } 281 282 /** 283 * @see IDataHandler#receive(byte, ATerm) 284 */ 285 public void receive(byte operation, final ATerm aTerm){ 286 switch(operation){ 287 case DO: 288 workerQueue.execute(new Runnable(){ 289 public void run(){ 290 doDo(aTerm); 291 send(ACKDO, termFactory.makeList()); 292 } 293 }); 294 break; 295 case EVAL: 296 workerQueue.execute(new Runnable(){ 297 public void run(){ 298 send(VALUE, doEval(aTerm)); 299 } 300 }); 301 break; 302 case ACKEVENT: 303 ATermList ackEvent = ((ATermList) aTerm); 304 ATerm event = ackEvent.getFirst(); 305 306 String ackSourceName = ((ATermAppl) event).getAFun().getName(); 307 308 JobQueue eventQueue; 309 synchronized(queues){ 310 eventQueue = queues.get(ackSourceName); 311 } 312 if(eventQueue == null){ 313 LoggerFactory.log("Received acknowledgement for a non-existent event: " + ackSourceName, ILogger.WARNING, IToolBusLoggerConstants.TOOL); 314 return; 315 } 316 eventQueue.ackEvent(); 317 318 ATerm callBackInfo = ackEvent.elementAt(1); 319 doReceiveAckEvent(callBackInfo); 320 break; 321 case RESPONSE: 322 ATermAppl response = (ATermAppl) aTerm; 323 String responseSourceName = response.getAFun().toString(); 324 325 JobQueue requestQueue; 326 synchronized(queues){ 327 requestQueue = queues.get(responseSourceName); 328 } 329 if(requestQueue == null){ 330 LoggerFactory.log("Received response on a non-existent request: " + responseSourceName, ILogger.WARNING, IToolBusLoggerConstants.TOOL); 331 return; 332 } 333 requestQueue.recResponse(response); 334 break; 335 case TERMINATE: 336 workerQueue.execute(new Runnable(){ 337 public void run(){ 338 doTerminate(aTerm); 339 terminate(); 340 } 341 }); 342 break; 343 case PERFORMANCESTATS: 344 ATerm performanceStats = doGetPerformanceStats(); 345 send(PERFORMANCESTATS, performanceStats); 346 break; 347 case DEBUGPERFORMANCESTATS: 348 ATerm debugPerformanceStats = doGetPerformanceStats(); 349 send(DEBUGPERFORMANCESTATS, debugPerformanceStats); 350 break; 351 default: 352 LoggerFactory.log("Unkown operation id: " + operation, ILogger.ERROR, IToolBusLoggerConstants.TOOL); 353 } 354 } 355 356 /** 357 * Requests the termination of this tool, so it can perform an orderly shutdown on the toolbus 358 * side. 359 * 360 * @see IDataHandler#terminate() 361 */ 362 public void terminate(){ 363 ioHandler.terminate(); 364 workerQueue.terminate(); 365 } 366 367 /** 368 * Informs the tool that it was shut down by the toolbus. 369 * 370 * @see IDataHandler#shutDown() 371 */ 372 public void shutDown(){ 373 LoggerFactory.log("The connection with the ToolBus was closed. Waiting for the tool to shut down ....", ILogger.INFO, IToolBusLoggerConstants.TOOL); 374 } 375 376 /** 377 * @see IDataHandler#exceptionOccured() 378 */ 379 public void exceptionOccured(){ 380 LoggerFactory.log("Lost connection with the ToolBus. Initiating ungraceful shutdown ....", ILogger.FATAL, IToolBusLoggerConstants.TOOL); 381 382 System.exit(0); 383 } 384 385 /** 386 * Executes the ToolBridge. 387 * 388 * @see Runnable#run() 389 */ 390 public void run(){ 391 // Initialize the worker queue. 392 workerQueue.start(); 393 394 // Only start the connection stuff if we haven't been linked to an I/O handler yet. 395 if(type.equals(AbstractTool.REMOTETOOL)){ 396 if(host == null || port == -1){ 397 LoggerFactory.log("Dunno where the ToolBus is running.", ILogger.FATAL, IToolBusLoggerConstants.TOOL); 398 throw new RuntimeException("Dunno where the ToolBus is running."); 399 } 400 401 ToolConnectionHandler toolConnectionHandler = new ToolConnectionHandler(this, host, port); 402 toolConnectionHandler.run(); 403 }else if(type.equals(AbstractTool.DIRECTTOOL)){ 404 DirectConnectionHandler directConnectionHandler = toolbus.getDirectConnectionHandler(); 405 try{ 406 directConnectionHandler.dock(this); 407 }catch(ToolBusError tberr){ 408 throw new RuntimeException(tberr); 409 } 410 }else{ 411 String error = "Unknown tool type: " + type; 412 LoggerFactory.log(error, ILogger.FATAL, IToolBusLoggerConstants.TOOL); 413 throw new RuntimeException(error); 414 } 415 // Send a connect message to the ToolBus. 416 send(CONNECT, termFactory.makeAppl(termFactory.makeAFun(toolName, 1, false), termFactory.makeInt(toolID))); 417 } 418 419 /** 420 * A job. 421 * 422 * @author Arnold Lankamp 423 */ 424 private static class Job{ 425 public final byte operation; 426 public final ATerm term; 427 public final long threadId; 428 public ATermAppl response; // Optional field 429 430 /** 431 * Constructor. 432 * 433 * @param operation 434 * The op-code. 435 * @param term 436 * The message associated with this event. 437 * @param threadId 438 * The id of the thread associated with this event. 439 */ 440 public Job(byte operation, ATerm term, long threadId){ 441 super(); 442 443 this.operation = operation; 444 this.term = term; 445 this.threadId = threadId; 446 } 447 } 448 449 /** 450 * This job queue holds all the jobs that are send from a single source. 451 * 452 * @author Arnold Lankamp 453 */ 454 private class JobQueue{ 455 private final List<Job> jobs; 456 private Job current; 457 458 /** 459 * Default constructor. 460 */ 461 public JobQueue(){ 462 super(); 463 464 jobs = new LinkedList<Job>(); 465 466 current = null; 467 } 468 469 /** 470 * Schedules the given job for transmission to the ToolBus. If there are currently no 471 * jobs in the queue, the event will be send immediately; otherwise we'll need to wait till 472 * all the requests (of the same source) that where previously scheduled have been send and 473 * acknowledged. 474 * 475 * @param job 476 * A container that hold the details about the request. 477 */ 478 public synchronized void post(Job job){ 479 if(current == null){ 480 ioHandler.send(job.operation, job.term); 481 current = job; 482 }else{ 483 jobs.add(job); 484 } 485 } 486 487 /** 488 * Returns the next job in the queue. 489 * 490 * @return The next job in the queue; null if the queue is empty. 491 */ 492 public synchronized Job getNext(){ 493 if(!jobs.isEmpty()) return jobs.remove(0); 494 495 return null; 496 } 497 498 /** 499 * Notifies the thread local queue of the acknowledgement and executes the next queued job 500 * (if present). 501 */ 502 private void acknowledge(){ 503 long threadId = current.threadId; 504 ThreadLocalJobQueue threadLocalQueue; 505 synchronized(threadLocalQueues){ 506 threadLocalQueue = threadLocalQueues.get(new Long(threadId)); 507 } 508 threadLocalQueue.acknowledge(); 509 510 Job next = getNext(); 511 current = next; 512 if(next != null){ 513 ioHandler.send(next.operation, next.term); 514 } 515 } 516 517 /** 518 * Acknowledges the last event that was send from the source this queue is associated with. 519 * It will execute the next job in the queue if there are any. 520 */ 521 public synchronized void ackEvent(){ 522 acknowledge(); 523 } 524 525 /** 526 * Acknowledges the last request that was send from the source this queue is associated 527 * with. It will execute the next job in the queue if there are any. 528 * 529 * @param response 530 * The response. 531 */ 532 public synchronized void recResponse(ATermAppl response){ 533 synchronized(current){ 534 current.response = response; 535 current.notify(); 536 } 537 538 acknowledge(); 539 } 540 } 541 542 /** 543 * This job queue holds all the jobs that are posted by a certain thread. 544 * 545 * @author Arnold Lankamp 546 */ 547 private class ThreadLocalJobQueue{ 548 private final List<Job> requests; 549 550 private boolean awaitingAck; 551 552 /** 553 * Default constructor. 554 */ 555 public ThreadLocalJobQueue(){ 556 super(); 557 558 requests = new LinkedList<Job>(); 559 } 560 561 /** 562 * Schedules the given event for transmission to the ToolBus. If there are currently no 563 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need 564 * to wait till all the jobs (associated with the current thread) that were previously 565 * scheduled have been submitted to the request queue. 566 * 567 * @param aTerm 568 * The term that hold the details about the event. 569 * @param threadId 570 * The id of the thread associated with the event. 571 */ 572 public synchronized void postEvent(ATerm aTerm, long threadId){ 573 Job request = new Job(EVENT, aTerm, threadId); 574 575 if(!awaitingAck){ 576 String sourceName = ((ATermAppl) aTerm).getAFun().getName(); 577 578 JobQueue requestQueue; 579 synchronized(queues){ 580 requestQueue = queues.get(sourceName); 581 if(requestQueue == null){ 582 requestQueue = new JobQueue(); 583 queues.put(sourceName, requestQueue); 584 } 585 } 586 requestQueue.post(request); 587 awaitingAck = true; 588 }else{ 589 requests.add(request); 590 } 591 } 592 593 /** 594 * Schedules the given request for transmission to the ToolBus. If there are currently no 595 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need 596 * to wait till all the jobs (associated with the current thread) that were previously 597 * scheduled have been submitted to the request queue. 598 * 599 * @param aTerm 600 * The term that hold the details about the request. 601 * @param threadId 602 * The id of the thread associated with the request. 603 * @return The received response on the issued request. 604 */ 605 public synchronized Job postRequest(ATerm aTerm, long threadId){ 606 Job job = new Job(REQUEST, aTerm, threadId); 607 if(!awaitingAck){ 608 String sourceName = ((ATermAppl) aTerm).getAFun().getName(); 609 610 JobQueue requestQueue; 611 synchronized(queues){ 612 requestQueue = queues.get(sourceName); 613 if(requestQueue == null){ 614 requestQueue = new JobQueue(); 615 queues.put(sourceName, requestQueue); 616 } 617 } 618 requestQueue.post(job); 619 awaitingAck = true; 620 }else{ 621 requests.add(job); 622 } 623 return job; 624 } 625 626 public ATermAppl waitForResponse(Job job){ 627 synchronized(job){ 628 while(job.response == null){ 629 try{ 630 job.wait(); 631 }catch(InterruptedException irex){ 632 // Ignore this, since I don't want to know about it. 633 } 634 } 635 } 636 return job.response; 637 } 638 639 /** 640 * Returns the next job in the queue. 641 * 642 * @return The next job in the queue; null if the queue is empty. 643 */ 644 public synchronized Job getNext(){ 645 if(!requests.isEmpty()) return requests.remove(0); 646 647 return null; 648 } 649 650 /** 651 * Acknowledges the last job that was send from the source the current thread is associated 652 * with. It will submit the next job in the queue if there are any. 653 */ 654 public synchronized void acknowledge(){ 655 Job next = getNext(); 656 if(next == null){ 657 awaitingAck = false; 658 }else{ 659 ATerm term = next.term; 660 String sourceName = ((ATermAppl) term).getAFun().getName(); 661 662 JobQueue requestQueue; 663 synchronized(queues){ 664 requestQueue = queues.get(sourceName); 665 if(requestQueue == null){ 666 requestQueue = new JobQueue(); 667 queues.put(sourceName, requestQueue); 668 } 669 } 670 671 requestQueue.post(next); 672 } 673 } 674 } 675 676 /** 677 * The queue that is meant to take care of the asynchroneous execution and queueing of anything 678 * that invokes stuff on a tool. 679 * 680 * @author Arnold Lankamp 681 */ 682 private static class WorkerQueue{ 683 private final Object lock = new Object(); 684 685 private final RotatingQueue<Runnable> queue; 686 private final Worker worker; 687 688 /** 689 * Default constructor. 690 */ 691 public WorkerQueue(){ 692 super(); 693 694 ThreadGroup toolGroup = new ThreadGroup("ToolGroup"); 695 queue = new RotatingQueue<Runnable>(); 696 worker = new Worker(toolGroup); 697 698 worker.setDaemon(true); 699 } 700 701 /** 702 * Starts the worker thread. 703 */ 704 public void start(){ 705 worker.start(); 706 } 707 708 /** 709 * Executes or queues the given runnable for execution. 710 * 711 * @param r 712 * The runnable containing the stuff that needs to be executed. 713 */ 714 public void execute(Runnable r){ 715 synchronized(lock){ 716 boolean notBusy = queue.isEmpty(); 717 queue.put(r); 718 if(notBusy) lock.notify(); 719 } 720 } 721 722 /** 723 * Terminates the worker thread as soon as is gracefully possible. 724 */ 725 public void terminate(){ 726 worker.terminate(); 727 } 728 729 /** 730 * The worker thread of this queue. 731 * 732 * @author Arnold Lankamp 733 */ 734 private class Worker extends Thread{ 735 private volatile boolean running; 736 737 /** 738 * Default constructor. 739 * @param threadGroup 740 * The thread group to join. 741 */ 742 public Worker(ThreadGroup threadGroup){ 743 super(threadGroup, "Worker"); 744 745 running = true; 746 } 747 748 /** 749 * The main execution loop. 750 */ 751 public void run(){ 752 do{ 753 Runnable r; 754 synchronized(lock){ 755 r = queue.get(); 756 while(r == null){ 757 try{ 758 lock.wait(); 759 }catch(InterruptedException irex){ 760 // Ignore this. 761 } 762 if(!running) return; 763 764 r = queue.get(); 765 } 766 } 767 768 r.run(); 769 }while(running); 770 } 771 772 /** 773 * Terminates this worker thread after the current iteration. 774 */ 775 public void terminate(){ 776 running = false; 777 synchronized(lock){ 778 lock.notify(); 779 } 780 } 781 } 782 } 783 }