001 package toolbus; 002 003 import java.io.IOException; 004 import java.lang.management.ManagementFactory; 005 import java.lang.management.MemoryMXBean; 006 import java.lang.management.ThreadInfo; 007 import java.lang.management.ThreadMXBean; 008 import java.net.ConnectException; 009 import java.net.InetAddress; 010 import java.net.InetSocketAddress; 011 import java.net.Socket; 012 import java.net.UnknownHostException; 013 import java.nio.ByteBuffer; 014 import java.nio.channels.SocketChannel; 015 import java.util.HashMap; 016 import java.util.HashSet; 017 import java.util.LinkedList; 018 import java.util.List; 019 import java.util.Map; 020 import java.util.Queue; 021 import java.util.Set; 022 023 import jjtraveler.VisitFailure; 024 import aterm.ATerm; 025 import aterm.ATermAppl; 026 import aterm.ATermFactory; 027 import aterm.ATermInt; 028 import aterm.ATermList; 029 import aterm.pure.PureFactory; 030 import aterm.pure.binary.BinaryReader; 031 import aterm.pure.binary.BinaryWriter; 032 033 abstract public class AbstractTool implements Tool, Runnable, IOperations{ 034 private final static int HANDSHAKEBUFFERSIZE = 4096; 035 036 private final ByteBuffer writeBuffer; 037 private final ByteBuffer readBuffer; 038 private SocketChannel socketChannel = null; 039 040 protected final ATermFactory factory; 041 042 private String toolname; 043 private int toolid = -1; 044 045 private String host; 046 private int port = -1; 047 048 private final Map<Long, ThreadLocalJobQueue> threadLocalQueues; 049 private final Map<String, JobQueue> queues; 050 051 private final WorkerQueue workerQueue; 052 053 private ATerm empty; 054 055 private Object lockObject; 056 057 private volatile boolean running; 058 059 private boolean connected; 060 061 private boolean expectingDisconnect = false; 062 063 public AbstractTool(ATermFactory factory){ 064 this.factory = factory; 065 066 empty = factory.makeList(); 067 threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>(); 068 queues = new HashMap<String, JobQueue>(); 069 070 workerQueue = new WorkerQueue(); 071 072 lockObject = this; 073 074 writeBuffer = ByteBuffer.allocateDirect(32768); 075 readBuffer = ByteBuffer.allocateDirect(32768); 076 } 077 078 public void init(String[] args) throws UnknownHostException{ 079 for(int i = 0; i < args.length; i++){ 080 if(args[i].equals("-TB_PORT")){ 081 port = Integer.parseInt(args[++i]); 082 } 083 if(args[i].equals("-TB_HOST")){ 084 host = args[++i]; 085 } 086 if(args[i].equals("-TB_TOOL_NAME")){ 087 toolname = args[++i]; 088 } 089 if(args[i].equals("-TB_TOOL_ID")){ 090 toolid = Integer.parseInt(args[++i]); 091 } 092 } 093 094 if(host == null || port == -1){ 095 throw new RuntimeException("Dunno where the ToolBus is running, so can't connect."); 096 } 097 } 098 099 public void setLockObject(Object lockObject){ 100 this.lockObject = lockObject; 101 } 102 103 public Object getLockObject(){ 104 return lockObject; 105 } 106 107 public void connect() throws IOException{ 108 if(connected) throw new IOException("already connected to ToolBus"); 109 110 try{ 111 socketChannel = SocketChannel.open(); 112 Socket socket = socketChannel.socket(); 113 // Disable Nagle's algorithm, we don't want the random 500ms delays. 114 socket.setTcpNoDelay(true); 115 // Set the traffic class to high throughput and low delay. 116 socket.setTrafficClass(0x18); 117 118 socketChannel.connect(new InetSocketAddress(host, port)); 119 socketChannel.configureBlocking(true); 120 121 shakeHands(); 122 }catch(ConnectException cex){ 123 throw new RuntimeException(cex); 124 }catch(IOException ioex){ 125 closeConnection(); 126 throw new RuntimeException(ioex); 127 }catch(RuntimeException rex){ 128 closeConnection(); 129 throw new RuntimeException(rex); 130 } 131 132 connected = true; 133 } 134 135 public void connect(String tool_name, InetAddress address, int p) throws IOException{ 136 if(connected) throw new IOException("already connected to ToolBus"); 137 138 this.toolname = tool_name; 139 this.host = address.getHostAddress(); 140 this.port = p; 141 142 try{ 143 socketChannel = SocketChannel.open(); 144 Socket socket = socketChannel.socket(); 145 // Disable Nagle's algorithm, we don't want the random 500ms delays. 146 socket.setTcpNoDelay(true); 147 // Set the traffic class to high throughput and low delay. 148 socket.setTrafficClass(0x18); 149 150 socketChannel.connect(new InetSocketAddress(host, port)); 151 socketChannel.configureBlocking(true); 152 153 shakeHands(); 154 }catch(ConnectException cex){ 155 throw new RuntimeException(cex); 156 }catch(IOException ioex){ 157 closeConnection(); 158 throw new RuntimeException(ioex); 159 }catch(RuntimeException rex){ 160 closeConnection(); 161 throw new RuntimeException(rex); 162 } 163 164 connected = true; 165 } 166 167 public void disconnect(){ 168 connected = false; 169 sendTerm(DISCONNECT, empty); 170 171 expectingDisconnect = true; 172 } 173 174 public boolean isConnected(){ 175 return connected; 176 } 177 178 public int getPort(){ 179 return port; 180 } 181 182 public InetAddress getAddress(){ 183 InetAddress adress; 184 try{ 185 adress = InetAddress.getByName(host); 186 }catch(UnknownHostException uhex){ 187 throw new RuntimeException("Unable to resolve ToolBus host adress."); 188 } 189 return adress; 190 } 191 192 private void writeTermToChannel(ATerm aTerm, ByteBuffer byteBuffer) throws IOException{ 193 BinaryWriter binaryWriter = new BinaryWriter(aTerm); 194 while(!binaryWriter.isFinished()){ 195 byteBuffer.clear(); 196 byteBuffer.position(2); 197 try{ 198 binaryWriter.serialize(byteBuffer); 199 }catch(VisitFailure vf){ 200 // Bogus catch block, this can't happen. 201 } 202 // Insert chunk size data. 203 int chunkSize = byteBuffer.limit() - 2; 204 byteBuffer.put(0, (byte) (chunkSize & 0x000000FF)); 205 byteBuffer.put(1, (byte) ((chunkSize >>> 8) & 0x000000FF)); 206 207 // Write chunk 208 int byteWritten = socketChannel.write(byteBuffer); 209 if(byteWritten == -1) throw new IOException("Tool disconnected"); 210 } 211 } 212 213 private ATerm readTermFromChannel(ByteBuffer byteBuffer) throws IOException{ 214 BinaryReader binaryReader = new BinaryReader((PureFactory) factory); 215 while(!binaryReader.isDone()){ 216 byteBuffer.clear(); 217 byteBuffer.limit(2); 218 do{ 219 int bytesRead = socketChannel.read(byteBuffer); 220 if(bytesRead == -1) throw new IOException("Tool disconnected"); 221 }while(byteBuffer.hasRemaining()); // <-- Workaround for a NIO bug. Blocking mode doesn't work properly. 222 byteBuffer.flip(); 223 224 int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8); 225 226 byteBuffer.clear(); 227 byteBuffer.limit(chunkSize); 228 do{ 229 int bytesRead = socketChannel.read(byteBuffer); 230 if(bytesRead == -1) throw new IOException("Tool disconnected"); 231 }while(byteBuffer.hasRemaining()); // <-- Workaround for a NIO bug. Blocking mode doesn't work properly. 232 byteBuffer.flip(); 233 234 binaryReader.deserialize(byteBuffer); 235 } 236 237 return binaryReader.getRoot(); 238 } 239 240 private void shakeHands() throws IOException{ 241 ByteBuffer handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE); 242 243 ATerm toolKey = factory.makeAppl(factory.makeAFun(toolname, 1, false), factory.makeInt(toolid)); 244 245 // Send the tool id. 246 writeTermToChannel(toolKey, handShakeBuffer); 247 248 // Receive signature. 249 handShakeBuffer.clear(); 250 readTermFromChannel(handShakeBuffer); 251 252 // TODO Check the signature. 253 byte sigOKByte = 1; 254 255 // Send signature confirmation. 256 handShakeBuffer.clear(); 257 handShakeBuffer.put(sigOKByte); 258 handShakeBuffer.flip(); 259 socketChannel.write(handShakeBuffer); 260 261 // Receive the (permanent) tool key. 262 ATerm newToolKey = readTermFromChannel(handShakeBuffer); 263 264 int toolID = ((ATermInt) newToolKey.getChildAt(0)).getInt(); 265 toolid = toolID; 266 267 sendTerm(CONNECT, newToolKey); 268 } 269 270 private void sendTerm(byte operation, ATerm term){ 271 synchronized(writeBuffer){ 272 writeBuffer.clear(); 273 writeBuffer.put(operation); 274 writeBuffer.flip(); 275 276 try{ 277 int bytesWritten = socketChannel.write(writeBuffer); 278 if(bytesWritten == -1) throw new IOException("Tool disconnected"); 279 280 writeTermToChannel(term, writeBuffer); 281 }catch(IOException ioex){ 282 throw new RuntimeException(ioex); 283 } 284 } 285 } 286 287 private class OperationTermPair{ 288 public byte operation; 289 public ATerm aTerm; 290 } 291 292 public OperationTermPair readTerm() throws IOException{ 293 OperationTermPair otp = new OperationTermPair(); 294 295 synchronized(readBuffer){ 296 readBuffer.clear(); 297 readBuffer.limit(1); 298 int byteRead = socketChannel.read(readBuffer); 299 if(byteRead == -1) return null; 300 readBuffer.flip(); 301 302 otp.operation = readBuffer.get(0); 303 304 otp.aTerm = readTermFromChannel(readBuffer); 305 } 306 307 return otp; 308 } 309 310 private void setRunning(boolean state){ 311 running = state; 312 } 313 314 public void run(){ 315 workerQueue.start(); 316 317 setRunning(true); 318 try{ 319 while(running){ 320 handleIncomingTerm(); 321 } 322 }catch(IOException ioex){ 323 ioex.printStackTrace(); 324 System.exit(0); 325 } 326 } 327 328 public void stopRunning(){ 329 setRunning(false); 330 workerQueue.terminate(); 331 } 332 333 public void handleIncomingTerm() throws IOException{ 334 OperationTermPair otp = readTerm(); 335 if(otp == null){ 336 if(expectingDisconnect){ 337 setRunning(false); 338 return; 339 } 340 throw new IOException("Tool disconnected."); 341 } 342 343 handleIncomingTerm(otp.operation, otp.aTerm); 344 } 345 346 public void handleIncomingTerm(byte operation, ATerm t){ 347 handleTerm(operation, t); 348 } 349 350 protected void handleTerm(byte operation, final ATerm t){ 351 switch(operation){ 352 case DO: 353 workerQueue.execute(new Runnable(){ 354 public void run(){ 355 handler(factory.make("rec-do(<term>)", t)); 356 sendTerm(ACKDO, empty); 357 } 358 }); 359 break; 360 case EVAL: 361 workerQueue.execute(new Runnable(){ 362 public void run(){ 363 ATermAppl result = (ATermAppl) handler(factory.make("rec-eval(<term>)", t)); 364 sendTerm(VALUE, result.getArgument(0)); 365 } 366 }); 367 break; 368 case ACKEVENT: 369 ATerm ackEvent = ((ATermList) t).getLast(); 370 handler(factory.make("rec-ack-event(<term>)", ackEvent)); 371 ackEvent((ATermList) t); 372 break; 373 case RESPONSE: 374 ATermAppl response = (ATermAppl) t; 375 String responseSourceName = response.getAFun().toString(); 376 377 JobQueue requestQueue; 378 synchronized(queues){ 379 requestQueue = queues.get(responseSourceName); 380 } 381 if(requestQueue == null){ 382 System.err.println("Received response on a non-existent request: " + responseSourceName); 383 return; 384 } 385 requestQueue.recResponse(response); 386 break; 387 case TERMINATE: 388 workerQueue.execute(new Runnable(){ 389 public void run(){ 390 handler(factory.make("rec-terminate(<term>)", t)); 391 expectingDisconnect = true; 392 sendTerm(END, empty); 393 } 394 }); 395 break; 396 case PERFORMANCESTATS: 397 ATerm performaceStats = getPerformanceStats(); 398 sendTerm(PERFORMANCESTATS, performaceStats); 399 break; 400 case DEBUGPERFORMANCESTATS: 401 ATerm debugPerformaceStats = getPerformanceStats(); 402 sendTerm(DEBUGPERFORMANCESTATS, debugPerformaceStats); 403 break; 404 case END: 405 connected = false; 406 setRunning(false); 407 closeConnection(); 408 break; 409 default: 410 throw new RuntimeException("Unknown tool operation with ID: " + operation); 411 } 412 } 413 414 public void sendTerm(ATerm term) throws IOException{ 415 throw new IOException("Unsupported operation. Unable to send term: "+term); 416 } 417 418 public void sendEvent(ATerm term){ 419 postEvent(term); 420 } 421 422 public void postEvent(ATerm term){ 423 long threadId = Thread.currentThread().getId(); 424 ThreadLocalJobQueue threadLocalQueue = null; 425 synchronized(threadLocalQueues){ 426 threadLocalQueue = threadLocalQueues.get(new Long(threadId)); 427 if(threadLocalQueue == null){ 428 threadLocalQueue = new ThreadLocalJobQueue(); 429 threadLocalQueues.put(new Long(threadId), threadLocalQueue); 430 } 431 } 432 threadLocalQueue.postEvent(term, threadId); 433 } 434 435 public ATerm postRequest(ATerm term){ 436 long threadId = Thread.currentThread().getId(); 437 ThreadLocalJobQueue threadLocalQueue; 438 synchronized(threadLocalQueues){ 439 threadLocalQueue = threadLocalQueues.get(new Long(threadId)); 440 if(threadLocalQueue == null){ 441 threadLocalQueue = new ThreadLocalJobQueue(); 442 threadLocalQueues.put(new Long(threadId), threadLocalQueue); 443 } 444 } 445 Job job = threadLocalQueue.postRequest(term, threadId); 446 return threadLocalQueue.waitForResponse(job); 447 } 448 449 private void ackEvent(ATermList ackEvent){ 450 ATerm event = ackEvent.getFirst(); 451 452 String sourceName = ((ATermAppl) event).getAFun().toString(); 453 454 JobQueue eventQueue; 455 synchronized(queues){ 456 eventQueue = queues.get(sourceName); 457 } 458 eventQueue.ackEvent(); 459 } 460 461 private void closeConnection(){ 462 Socket socket = socketChannel.socket(); 463 464 // Close the in- output stream of the socket to ensure that the file descriptors are closed 465 // immidiately and NOT whenever the JVM feels like it. 466 try{ 467 if(!socket.isInputShutdown()) socket.shutdownInput(); 468 }catch(IOException ioex){ 469 // Ignore 470 } 471 try{ 472 if(!socket.isOutputShutdown()) socket.shutdownOutput(); 473 }catch(IOException ioex){ 474 // Ignore 475 } 476 477 try{ 478 if(!socket.isClosed()) socket.close(); 479 }catch(IOException ioex){ 480 ioex.printStackTrace(); 481 } 482 } 483 484 /** 485 * A job. 486 * 487 * @author Arnold Lankamp 488 */ 489 private static class Job{ 490 public final byte operation; 491 public final ATerm term; 492 public final long threadId; 493 public ATerm response; // Optional field 494 495 /** 496 * Constructor. 497 * 498 * @param operation 499 * The op-code. 500 * @param term 501 * The message associated with this event. 502 * @param threadId 503 * The id of the thread associated with this event. 504 */ 505 public Job(byte operation, ATerm term, long threadId){ 506 super(); 507 508 this.operation = operation; 509 this.term = term; 510 this.threadId = threadId; 511 } 512 } 513 514 /** 515 * This job queue holds all the jobs that are send from a single source. 516 * 517 * @author Arnold Lankamp 518 */ 519 private class JobQueue{ 520 private final List<Job> jobs; 521 private Job current; 522 523 /** 524 * Default constructor. 525 */ 526 public JobQueue(){ 527 super(); 528 529 jobs = new LinkedList<Job>(); 530 531 current = null; 532 } 533 534 /** 535 * Schedules the given job for transmission to the ToolBus. If there are currently no 536 * jobs in the queue, the event will be send immediately; otherwise we'll need to wait till 537 * all the requests (of the same source) that where previously scheduled have been send and 538 * acknowledged. 539 * 540 * @param job 541 * A container that hold the details about the request. 542 */ 543 public synchronized void post(Job job){ 544 if(current == null){ 545 sendTerm(job.operation, job.term); 546 current = job; 547 }else{ 548 jobs.add(job); 549 } 550 } 551 552 /** 553 * Returns the next job in the queue. 554 * 555 * @return The next job in the queue; null if the queue is empty. 556 */ 557 public synchronized Job getNext(){ 558 if(!jobs.isEmpty()) return jobs.remove(0); 559 560 return null; 561 } 562 563 /** 564 * Notifies the thread local queue of the acknowledgement and executes the next queued job 565 * (if present). 566 */ 567 private void acknowledge(){ 568 long threadId = current.threadId; 569 ThreadLocalJobQueue threadLocalQueue; 570 synchronized(threadLocalQueues){ 571 threadLocalQueue = threadLocalQueues.get(new Long(threadId)); 572 } 573 threadLocalQueue.acknowledge(); 574 575 Job next = getNext(); 576 current = next; 577 if(next != null){ 578 sendTerm(next.operation, next.term); 579 } 580 } 581 582 /** 583 * Acknowledges the last event that was send from the source this queue is associated with. 584 * It will execute the next job in the queue if there are any. 585 */ 586 public synchronized void ackEvent(){ 587 acknowledge(); 588 } 589 590 /** 591 * Acknowledges the last request that was send from the source this queue is associated 592 * with. It will execute the next job in the queue if there are any. 593 * 594 * @param response 595 * The response. 596 */ 597 public synchronized void recResponse(ATermAppl response){ 598 synchronized(current){ 599 current.response = response; 600 current.notify(); 601 } 602 603 acknowledge(); 604 } 605 } 606 607 /** 608 * This job queue holds all the jobs that are posted by a certain thread. 609 * 610 * @author Arnold Lankamp 611 */ 612 private class ThreadLocalJobQueue{ 613 private final List<Job> requests; 614 615 private boolean awaitingAck; 616 617 /** 618 * Default constructor. 619 */ 620 public ThreadLocalJobQueue(){ 621 super(); 622 623 requests = new LinkedList<Job>(); 624 } 625 626 /** 627 * Schedules the given event for transmission to the ToolBus. If there are currently no 628 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need 629 * to wait till all the jobs (associated with the current thread) that were previously 630 * scheduled have been submitted to the request queue. 631 * 632 * @param aTerm 633 * The term that hold the details about the event. 634 * @param threadId 635 * The id of the thread associated with the event. 636 */ 637 public synchronized void postEvent(ATerm aTerm, long threadId){ 638 Job request = new Job(EVENT, aTerm, threadId); 639 640 if(!awaitingAck){ 641 String sourceName = ((ATermAppl) aTerm).getAFun().getName(); 642 643 JobQueue requestQueue; 644 synchronized(queues){ 645 requestQueue = queues.get(sourceName); 646 if(requestQueue == null){ 647 requestQueue = new JobQueue(); 648 queues.put(sourceName, requestQueue); 649 } 650 } 651 requestQueue.post(request); 652 awaitingAck = true; 653 }else{ 654 requests.add(request); 655 } 656 } 657 658 /** 659 * Schedules the given request for transmission to the ToolBus. If there are currently no 660 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need 661 * to wait till all the jobs (associated with the current thread) that were previously 662 * scheduled have been submitted to the request queue. 663 * 664 * @param aTerm 665 * The term that hold the details about the request. 666 * @param threadId 667 * The id of the thread associated with the request. 668 * @return The received response on the issued request. 669 */ 670 public synchronized Job postRequest(ATerm aTerm, long threadId){ 671 Job job = new Job(REQUEST, aTerm, threadId); 672 if(!awaitingAck){ 673 String sourceName = ((ATermAppl) aTerm).getAFun().getName(); 674 675 JobQueue requestQueue; 676 synchronized(queues){ 677 requestQueue = queues.get(sourceName); 678 if(requestQueue == null){ 679 requestQueue = new JobQueue(); 680 queues.put(sourceName, requestQueue); 681 } 682 } 683 requestQueue.post(job); 684 awaitingAck = true; 685 }else{ 686 requests.add(job); 687 } 688 return job; 689 } 690 691 public ATerm waitForResponse(Job job){ 692 synchronized(job){ 693 while(job.response == null){ 694 try{ 695 job.wait(); 696 }catch(InterruptedException irex){ 697 // Ignore this, since I don't want to know about it. 698 } 699 } 700 } 701 return job.response; 702 } 703 704 /** 705 * Returns the next job in the queue. 706 * 707 * @return The next job in the queue; null if the queue is empty. 708 */ 709 public synchronized Job getNext(){ 710 if(!requests.isEmpty()) return requests.remove(0); 711 712 return null; 713 } 714 715 /** 716 * Acknowledges the last job that was send from the source the current thread is associated 717 * with. It will submit the next job in the queue if there are any. 718 */ 719 public synchronized void acknowledge(){ 720 Job next = getNext(); 721 if(next == null){ 722 awaitingAck = false; 723 }else{ 724 ATerm term = next.term; 725 String sourceName = ((ATermAppl) term).getAFun().getName(); 726 727 JobQueue requestQueue; 728 synchronized(queues){ 729 requestQueue = queues.get(sourceName); 730 if(requestQueue == null){ 731 requestQueue = new JobQueue(); 732 queues.put(sourceName, requestQueue); 733 } 734 } 735 736 requestQueue.post(next); 737 } 738 } 739 } 740 741 /** 742 * The queue that is meant to take care of the asynchroneous execution and queueing of anything 743 * that invokes stuff on a tool. 744 * 745 * @author Arnold Lankamp 746 */ 747 private static class WorkerQueue{ 748 private final Object lock = new Object(); 749 750 private final Queue<Runnable> queue; 751 private final Worker worker; 752 753 /** 754 * Default constructor. 755 */ 756 public WorkerQueue(){ 757 super(); 758 759 this.queue = new LinkedList<Runnable>(); 760 this.worker = new Worker(); 761 worker.setDaemon(true); 762 } 763 764 /** 765 * Starts the worker thread. 766 */ 767 public void start(){ 768 worker.start(); 769 } 770 771 /** 772 * Executes or queues the given runnable for execution. 773 * 774 * @param r 775 * The runnable containing the stuff that needs to be executed. 776 */ 777 public void execute(Runnable r){ 778 synchronized(lock){ 779 boolean notBusy = queue.isEmpty(); 780 queue.offer(r); 781 if(notBusy) lock.notify(); 782 } 783 } 784 785 /** 786 * Terminates the worker thread as soon as is gracefully possible. 787 */ 788 public void terminate(){ 789 worker.terminate(); 790 } 791 792 /** 793 * The worker thread of this queue. 794 * 795 * @author Arnold Lankamp 796 */ 797 private class Worker extends Thread{ 798 private volatile boolean running; 799 800 /** 801 * Default constructor. 802 */ 803 public Worker(){ 804 super(); 805 806 running = true; 807 } 808 809 /** 810 * The main execution loop. 811 */ 812 public void run(){ 813 do{ 814 Runnable r; 815 synchronized(lock){ 816 r = queue.poll(); 817 while(r == null){ 818 try{ 819 lock.wait(); 820 }catch(InterruptedException irex){ 821 // Ignore this. 822 } 823 if(!running) return; 824 825 r = queue.poll(); 826 } 827 } 828 829 r.run(); 830 }while(running); 831 } 832 833 /** 834 * Terminates this worker thread after the current iteration. 835 */ 836 public void terminate(){ 837 running = false; 838 synchronized(lock){ 839 lock.notify(); 840 } 841 } 842 } 843 } 844 845 /** 846 * Gathers performance statistics about this tool, like memory usage and the user-/system-time 847 * spend per thread. 848 * 849 * @return performance statictics. 850 */ 851 private ATerm getPerformanceStats(){ 852 // Type stuff 853 ATerm remote = factory.makeAppl(factory.makeAFun("legacy", 0, true)); 854 ATerm toolType = factory.makeAppl(factory.makeAFun("type", 1, false), remote); 855 856 ATerm java = factory.makeAppl(factory.makeAFun("Java", 0, true)); 857 ATerm toolLanguage = factory.makeAppl(factory.makeAFun("language", 1, false), java); 858 859 ATerm tool = factory.makeAppl(factory.makeAFun("tool", 2, false), toolType, toolLanguage); 860 861 // Memory stuff 862 MemoryMXBean mmxb = ManagementFactory.getMemoryMXBean(); 863 long heapMemoryUsage = mmxb.getHeapMemoryUsage().getUsed(); 864 long nonHeapMemoryUsage = mmxb.getNonHeapMemoryUsage().getUsed(); 865 866 ATerm heapUsage = factory.makeAppl(factory.makeAFun("heap-usage", 1, false), factory.makeInt(((int) (heapMemoryUsage / 1024)))); 867 ATerm nonHeapUsage = factory.makeAppl(factory.makeAFun("non-heap-usage", 1, false), factory.makeInt(((int) (nonHeapMemoryUsage / 1024)))); 868 869 ATerm memory = factory.makeAppl(factory.makeAFun("memory-usage", 2, false), heapUsage, nonHeapUsage); 870 871 // Thread stuff 872 ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup(); 873 Thread[] relevantThreads = new Thread[currentThreadGroup.activeCount() * 2 + 10]; // Create an array that's more then big enough. 874 currentThreadGroup.enumerate(relevantThreads); 875 876 Set<Long> relevantThreadIds = new HashSet<Long>(); 877 for(int i = 0; i < relevantThreads.length; i++){ 878 Thread thread = relevantThreads[i]; 879 if(thread != null){ 880 relevantThreadIds.add(new Long(thread.getId())); 881 } 882 } 883 884 ThreadMXBean tmxb = ManagementFactory.getThreadMXBean(); 885 886 ATerm threads; 887 888 long[] threadIds = tmxb.getAllThreadIds(); 889 int nrOfThreads = threadIds.length; 890 try{ 891 ATermList threadsList = factory.makeList(); 892 for(int i = 0; i < nrOfThreads; i++){ 893 long threadId = threadIds[i]; 894 if(relevantThreadIds.contains(new Long(threadId))){ // Only list the info if we're interested in it. 895 ThreadInfo ti = tmxb.getThreadInfo(threadId); 896 if(ti != null){ 897 String threadName = ti.getThreadName(); 898 long userTime = tmxb.getThreadUserTime(threadIds[i]); 899 long systemTime = tmxb.getThreadCpuTime(threadIds[i]) - userTime; 900 901 if((userTime + systemTime) <= 0) continue; 902 903 ATerm userTimeTerm = factory.makeAppl(factory.makeAFun("user-time", 1, false), factory.makeInt(((int) (userTime / 1000000)))); 904 ATerm systemTimeTerm = factory.makeAppl(factory.makeAFun("system-time", 1, false), factory.makeInt(((int) (systemTime / 1000000)))); 905 ATerm thread = factory.makeAppl(factory.makeAFun(threadName, 2, false), userTimeTerm, systemTimeTerm); 906 907 threadsList = factory.makeList(thread, threadsList); 908 } 909 } 910 } 911 912 threads = factory.makeAppl(factory.makeAFun("threads", 1, false), threadsList); 913 }catch(UnsupportedOperationException uoex){ 914 threads = factory.make("threads(unsupported-operation)"); 915 System.out.println("Thread time profiling is not supported by this JVM."); 916 } 917 918 return factory.makeAppl(factory.makeAFun("performance-stats", 3, false), tool, memory, threads); 919 } 920 }