001 package toolbus;
002
003 import java.io.IOException;
004 import java.lang.management.ManagementFactory;
005 import java.lang.management.MemoryMXBean;
006 import java.lang.management.ThreadInfo;
007 import java.lang.management.ThreadMXBean;
008 import java.net.ConnectException;
009 import java.net.InetAddress;
010 import java.net.InetSocketAddress;
011 import java.net.Socket;
012 import java.net.UnknownHostException;
013 import java.nio.ByteBuffer;
014 import java.nio.channels.SocketChannel;
015 import java.util.HashMap;
016 import java.util.HashSet;
017 import java.util.LinkedList;
018 import java.util.List;
019 import java.util.Map;
020 import java.util.Queue;
021 import java.util.Set;
022
023 import jjtraveler.VisitFailure;
024 import aterm.ATerm;
025 import aterm.ATermAppl;
026 import aterm.ATermFactory;
027 import aterm.ATermInt;
028 import aterm.ATermList;
029 import aterm.pure.PureFactory;
030 import aterm.pure.binary.BinaryReader;
031 import aterm.pure.binary.BinaryWriter;
032
033 abstract public class AbstractTool implements Tool, Runnable, IOperations{
034 private final static int HANDSHAKEBUFFERSIZE = 4096;
035
036 private final ByteBuffer writeBuffer;
037 private final ByteBuffer readBuffer;
038 private SocketChannel socketChannel = null;
039
040 protected final ATermFactory factory;
041
042 private String toolname;
043 private int toolid = -1;
044
045 private String host;
046 private int port = -1;
047
048 private final Map<Long, ThreadLocalJobQueue> threadLocalQueues;
049 private final Map<String, JobQueue> queues;
050
051 private final WorkerQueue workerQueue;
052
053 private ATerm empty;
054
055 private Object lockObject;
056
057 private volatile boolean running;
058
059 private boolean connected;
060
061 private boolean expectingDisconnect = false;
062
063 public AbstractTool(ATermFactory factory){
064 this.factory = factory;
065
066 empty = factory.makeList();
067 threadLocalQueues = new HashMap<Long, ThreadLocalJobQueue>();
068 queues = new HashMap<String, JobQueue>();
069
070 workerQueue = new WorkerQueue();
071
072 lockObject = this;
073
074 writeBuffer = ByteBuffer.allocateDirect(32768);
075 readBuffer = ByteBuffer.allocateDirect(32768);
076 }
077
078 public void init(String[] args) throws UnknownHostException{
079 for(int i = 0; i < args.length; i++){
080 if(args[i].equals("-TB_PORT")){
081 port = Integer.parseInt(args[++i]);
082 }
083 if(args[i].equals("-TB_HOST")){
084 host = args[++i];
085 }
086 if(args[i].equals("-TB_TOOL_NAME")){
087 toolname = args[++i];
088 }
089 if(args[i].equals("-TB_TOOL_ID")){
090 toolid = Integer.parseInt(args[++i]);
091 }
092 }
093
094 if(host == null || port == -1){
095 throw new RuntimeException("Dunno where the ToolBus is running, so can't connect.");
096 }
097 }
098
099 public void setLockObject(Object lockObject){
100 this.lockObject = lockObject;
101 }
102
103 public Object getLockObject(){
104 return lockObject;
105 }
106
107 public void connect() throws IOException{
108 if(connected) throw new IOException("already connected to ToolBus");
109
110 try{
111 socketChannel = SocketChannel.open();
112 Socket socket = socketChannel.socket();
113 // Disable Nagle's algorithm, we don't want the random 500ms delays.
114 socket.setTcpNoDelay(true);
115 // Set the traffic class to high throughput and low delay.
116 socket.setTrafficClass(0x18);
117
118 socketChannel.connect(new InetSocketAddress(host, port));
119 socketChannel.configureBlocking(true);
120
121 shakeHands();
122 }catch(ConnectException cex){
123 throw new RuntimeException(cex);
124 }catch(IOException ioex){
125 closeConnection();
126 throw new RuntimeException(ioex);
127 }catch(RuntimeException rex){
128 closeConnection();
129 throw new RuntimeException(rex);
130 }
131
132 connected = true;
133 }
134
135 public void connect(String tool_name, InetAddress address, int p) throws IOException{
136 if(connected) throw new IOException("already connected to ToolBus");
137
138 this.toolname = tool_name;
139 this.host = address.getHostAddress();
140 this.port = p;
141
142 try{
143 socketChannel = SocketChannel.open();
144 Socket socket = socketChannel.socket();
145 // Disable Nagle's algorithm, we don't want the random 500ms delays.
146 socket.setTcpNoDelay(true);
147 // Set the traffic class to high throughput and low delay.
148 socket.setTrafficClass(0x18);
149
150 socketChannel.connect(new InetSocketAddress(host, port));
151 socketChannel.configureBlocking(true);
152
153 shakeHands();
154 }catch(ConnectException cex){
155 throw new RuntimeException(cex);
156 }catch(IOException ioex){
157 closeConnection();
158 throw new RuntimeException(ioex);
159 }catch(RuntimeException rex){
160 closeConnection();
161 throw new RuntimeException(rex);
162 }
163
164 connected = true;
165 }
166
167 public void disconnect(){
168 connected = false;
169 sendTerm(DISCONNECT, empty);
170
171 expectingDisconnect = true;
172 }
173
174 public boolean isConnected(){
175 return connected;
176 }
177
178 public int getPort(){
179 return port;
180 }
181
182 public InetAddress getAddress(){
183 InetAddress adress;
184 try{
185 adress = InetAddress.getByName(host);
186 }catch(UnknownHostException uhex){
187 throw new RuntimeException("Unable to resolve ToolBus host adress.");
188 }
189 return adress;
190 }
191
192 private void writeTermToChannel(ATerm aTerm, ByteBuffer byteBuffer) throws IOException{
193 BinaryWriter binaryWriter = new BinaryWriter(aTerm);
194 while(!binaryWriter.isFinished()){
195 byteBuffer.clear();
196 byteBuffer.position(2);
197 try{
198 binaryWriter.serialize(byteBuffer);
199 }catch(VisitFailure vf){
200 // Bogus catch block, this can't happen.
201 }
202 // Insert chunk size data.
203 int chunkSize = byteBuffer.limit() - 2;
204 byteBuffer.put(0, (byte) (chunkSize & 0x000000FF));
205 byteBuffer.put(1, (byte) ((chunkSize >>> 8) & 0x000000FF));
206
207 // Write chunk
208 int byteWritten = socketChannel.write(byteBuffer);
209 if(byteWritten == -1) throw new IOException("Tool disconnected");
210 }
211 }
212
213 private ATerm readTermFromChannel(ByteBuffer byteBuffer) throws IOException{
214 BinaryReader binaryReader = new BinaryReader((PureFactory) factory);
215 while(!binaryReader.isDone()){
216 byteBuffer.clear();
217 byteBuffer.limit(2);
218 do{
219 int bytesRead = socketChannel.read(byteBuffer);
220 if(bytesRead == -1) throw new IOException("Tool disconnected");
221 }while(byteBuffer.hasRemaining()); // <-- Workaround for a NIO bug. Blocking mode doesn't work properly.
222 byteBuffer.flip();
223
224 int chunkSize = (byteBuffer.get(0) & 0x000000FF) + ((byteBuffer.get(1) & 0x000000FF) << 8);
225
226 byteBuffer.clear();
227 byteBuffer.limit(chunkSize);
228 do{
229 int bytesRead = socketChannel.read(byteBuffer);
230 if(bytesRead == -1) throw new IOException("Tool disconnected");
231 }while(byteBuffer.hasRemaining()); // <-- Workaround for a NIO bug. Blocking mode doesn't work properly.
232 byteBuffer.flip();
233
234 binaryReader.deserialize(byteBuffer);
235 }
236
237 return binaryReader.getRoot();
238 }
239
240 private void shakeHands() throws IOException{
241 ByteBuffer handShakeBuffer = ByteBuffer.allocateDirect(HANDSHAKEBUFFERSIZE);
242
243 ATerm toolKey = factory.makeAppl(factory.makeAFun(toolname, 1, false), factory.makeInt(toolid));
244
245 // Send the tool id.
246 writeTermToChannel(toolKey, handShakeBuffer);
247
248 // Receive signature.
249 handShakeBuffer.clear();
250 readTermFromChannel(handShakeBuffer);
251
252 // TODO Check the signature.
253 byte sigOKByte = 1;
254
255 // Send signature confirmation.
256 handShakeBuffer.clear();
257 handShakeBuffer.put(sigOKByte);
258 handShakeBuffer.flip();
259 socketChannel.write(handShakeBuffer);
260
261 // Receive the (permanent) tool key.
262 ATerm newToolKey = readTermFromChannel(handShakeBuffer);
263
264 int toolID = ((ATermInt) newToolKey.getChildAt(0)).getInt();
265 toolid = toolID;
266
267 sendTerm(CONNECT, newToolKey);
268 }
269
270 private void sendTerm(byte operation, ATerm term){
271 synchronized(writeBuffer){
272 writeBuffer.clear();
273 writeBuffer.put(operation);
274 writeBuffer.flip();
275
276 try{
277 int bytesWritten = socketChannel.write(writeBuffer);
278 if(bytesWritten == -1) throw new IOException("Tool disconnected");
279
280 writeTermToChannel(term, writeBuffer);
281 }catch(IOException ioex){
282 throw new RuntimeException(ioex);
283 }
284 }
285 }
286
287 private class OperationTermPair{
288 public byte operation;
289 public ATerm aTerm;
290 }
291
292 public OperationTermPair readTerm() throws IOException{
293 OperationTermPair otp = new OperationTermPair();
294
295 synchronized(readBuffer){
296 readBuffer.clear();
297 readBuffer.limit(1);
298 int byteRead = socketChannel.read(readBuffer);
299 if(byteRead == -1) return null;
300 readBuffer.flip();
301
302 otp.operation = readBuffer.get(0);
303
304 otp.aTerm = readTermFromChannel(readBuffer);
305 }
306
307 return otp;
308 }
309
310 private void setRunning(boolean state){
311 running = state;
312 }
313
314 public void run(){
315 workerQueue.start();
316
317 setRunning(true);
318 try{
319 while(running){
320 handleIncomingTerm();
321 }
322 }catch(IOException ioex){
323 ioex.printStackTrace();
324 System.exit(0);
325 }
326 }
327
328 public void stopRunning(){
329 setRunning(false);
330 workerQueue.terminate();
331 }
332
333 public void handleIncomingTerm() throws IOException{
334 OperationTermPair otp = readTerm();
335 if(otp == null){
336 if(expectingDisconnect){
337 setRunning(false);
338 return;
339 }
340 throw new IOException("Tool disconnected.");
341 }
342
343 handleIncomingTerm(otp.operation, otp.aTerm);
344 }
345
346 public void handleIncomingTerm(byte operation, ATerm t){
347 handleTerm(operation, t);
348 }
349
350 protected void handleTerm(byte operation, final ATerm t){
351 switch(operation){
352 case DO:
353 workerQueue.execute(new Runnable(){
354 public void run(){
355 handler(factory.make("rec-do(<term>)", t));
356 sendTerm(ACKDO, empty);
357 }
358 });
359 break;
360 case EVAL:
361 workerQueue.execute(new Runnable(){
362 public void run(){
363 ATermAppl result = (ATermAppl) handler(factory.make("rec-eval(<term>)", t));
364 sendTerm(VALUE, result.getArgument(0));
365 }
366 });
367 break;
368 case ACKEVENT:
369 ATerm ackEvent = ((ATermList) t).getLast();
370 handler(factory.make("rec-ack-event(<term>)", ackEvent));
371 ackEvent((ATermList) t);
372 break;
373 case RESPONSE:
374 ATermAppl response = (ATermAppl) t;
375 String responseSourceName = response.getAFun().toString();
376
377 JobQueue requestQueue;
378 synchronized(queues){
379 requestQueue = queues.get(responseSourceName);
380 }
381 if(requestQueue == null){
382 System.err.println("Received response on a non-existent request: " + responseSourceName);
383 return;
384 }
385 requestQueue.recResponse(response);
386 break;
387 case TERMINATE:
388 workerQueue.execute(new Runnable(){
389 public void run(){
390 handler(factory.make("rec-terminate(<term>)", t));
391 expectingDisconnect = true;
392 sendTerm(END, empty);
393 }
394 });
395 break;
396 case PERFORMANCESTATS:
397 ATerm performaceStats = getPerformanceStats();
398 sendTerm(PERFORMANCESTATS, performaceStats);
399 break;
400 case DEBUGPERFORMANCESTATS:
401 ATerm debugPerformaceStats = getPerformanceStats();
402 sendTerm(DEBUGPERFORMANCESTATS, debugPerformaceStats);
403 break;
404 case END:
405 connected = false;
406 setRunning(false);
407 closeConnection();
408 break;
409 default:
410 throw new RuntimeException("Unknown tool operation with ID: " + operation);
411 }
412 }
413
414 public void sendTerm(ATerm term) throws IOException{
415 throw new IOException("Unsupported operation. Unable to send term: "+term);
416 }
417
418 public void sendEvent(ATerm term){
419 postEvent(term);
420 }
421
422 public void postEvent(ATerm term){
423 long threadId = Thread.currentThread().getId();
424 ThreadLocalJobQueue threadLocalQueue = null;
425 synchronized(threadLocalQueues){
426 threadLocalQueue = threadLocalQueues.get(new Long(threadId));
427 if(threadLocalQueue == null){
428 threadLocalQueue = new ThreadLocalJobQueue();
429 threadLocalQueues.put(new Long(threadId), threadLocalQueue);
430 }
431 }
432 threadLocalQueue.postEvent(term, threadId);
433 }
434
435 public ATerm postRequest(ATerm term){
436 long threadId = Thread.currentThread().getId();
437 ThreadLocalJobQueue threadLocalQueue;
438 synchronized(threadLocalQueues){
439 threadLocalQueue = threadLocalQueues.get(new Long(threadId));
440 if(threadLocalQueue == null){
441 threadLocalQueue = new ThreadLocalJobQueue();
442 threadLocalQueues.put(new Long(threadId), threadLocalQueue);
443 }
444 }
445 Job job = threadLocalQueue.postRequest(term, threadId);
446 return threadLocalQueue.waitForResponse(job);
447 }
448
449 private void ackEvent(ATermList ackEvent){
450 ATerm event = ackEvent.getFirst();
451
452 String sourceName = ((ATermAppl) event).getAFun().toString();
453
454 JobQueue eventQueue;
455 synchronized(queues){
456 eventQueue = queues.get(sourceName);
457 }
458 eventQueue.ackEvent();
459 }
460
461 private void closeConnection(){
462 Socket socket = socketChannel.socket();
463
464 // Close the in- output stream of the socket to ensure that the file descriptors are closed
465 // immidiately and NOT whenever the JVM feels like it.
466 try{
467 if(!socket.isInputShutdown()) socket.shutdownInput();
468 }catch(IOException ioex){
469 // Ignore
470 }
471 try{
472 if(!socket.isOutputShutdown()) socket.shutdownOutput();
473 }catch(IOException ioex){
474 // Ignore
475 }
476
477 try{
478 if(!socket.isClosed()) socket.close();
479 }catch(IOException ioex){
480 ioex.printStackTrace();
481 }
482 }
483
484 /**
485 * A job.
486 *
487 * @author Arnold Lankamp
488 */
489 private static class Job{
490 public final byte operation;
491 public final ATerm term;
492 public final long threadId;
493 public ATerm response; // Optional field
494
495 /**
496 * Constructor.
497 *
498 * @param operation
499 * The op-code.
500 * @param term
501 * The message associated with this event.
502 * @param threadId
503 * The id of the thread associated with this event.
504 */
505 public Job(byte operation, ATerm term, long threadId){
506 super();
507
508 this.operation = operation;
509 this.term = term;
510 this.threadId = threadId;
511 }
512 }
513
514 /**
515 * This job queue holds all the jobs that are send from a single source.
516 *
517 * @author Arnold Lankamp
518 */
519 private class JobQueue{
520 private final List<Job> jobs;
521 private Job current;
522
523 /**
524 * Default constructor.
525 */
526 public JobQueue(){
527 super();
528
529 jobs = new LinkedList<Job>();
530
531 current = null;
532 }
533
534 /**
535 * Schedules the given job for transmission to the ToolBus. If there are currently no
536 * jobs in the queue, the event will be send immediately; otherwise we'll need to wait till
537 * all the requests (of the same source) that where previously scheduled have been send and
538 * acknowledged.
539 *
540 * @param job
541 * A container that hold the details about the request.
542 */
543 public synchronized void post(Job job){
544 if(current == null){
545 sendTerm(job.operation, job.term);
546 current = job;
547 }else{
548 jobs.add(job);
549 }
550 }
551
552 /**
553 * Returns the next job in the queue.
554 *
555 * @return The next job in the queue; null if the queue is empty.
556 */
557 public synchronized Job getNext(){
558 if(!jobs.isEmpty()) return jobs.remove(0);
559
560 return null;
561 }
562
563 /**
564 * Notifies the thread local queue of the acknowledgement and executes the next queued job
565 * (if present).
566 */
567 private void acknowledge(){
568 long threadId = current.threadId;
569 ThreadLocalJobQueue threadLocalQueue;
570 synchronized(threadLocalQueues){
571 threadLocalQueue = threadLocalQueues.get(new Long(threadId));
572 }
573 threadLocalQueue.acknowledge();
574
575 Job next = getNext();
576 current = next;
577 if(next != null){
578 sendTerm(next.operation, next.term);
579 }
580 }
581
582 /**
583 * Acknowledges the last event that was send from the source this queue is associated with.
584 * It will execute the next job in the queue if there are any.
585 */
586 public synchronized void ackEvent(){
587 acknowledge();
588 }
589
590 /**
591 * Acknowledges the last request that was send from the source this queue is associated
592 * with. It will execute the next job in the queue if there are any.
593 *
594 * @param response
595 * The response.
596 */
597 public synchronized void recResponse(ATermAppl response){
598 synchronized(current){
599 current.response = response;
600 current.notify();
601 }
602
603 acknowledge();
604 }
605 }
606
607 /**
608 * This job queue holds all the jobs that are posted by a certain thread.
609 *
610 * @author Arnold Lankamp
611 */
612 private class ThreadLocalJobQueue{
613 private final List<Job> requests;
614
615 private boolean awaitingAck;
616
617 /**
618 * Default constructor.
619 */
620 public ThreadLocalJobQueue(){
621 super();
622
623 requests = new LinkedList<Job>();
624 }
625
626 /**
627 * Schedules the given event for transmission to the ToolBus. If there are currently no
628 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
629 * to wait till all the jobs (associated with the current thread) that were previously
630 * scheduled have been submitted to the request queue.
631 *
632 * @param aTerm
633 * The term that hold the details about the event.
634 * @param threadId
635 * The id of the thread associated with the event.
636 */
637 public synchronized void postEvent(ATerm aTerm, long threadId){
638 Job request = new Job(EVENT, aTerm, threadId);
639
640 if(!awaitingAck){
641 String sourceName = ((ATermAppl) aTerm).getAFun().getName();
642
643 JobQueue requestQueue;
644 synchronized(queues){
645 requestQueue = queues.get(sourceName);
646 if(requestQueue == null){
647 requestQueue = new JobQueue();
648 queues.put(sourceName, requestQueue);
649 }
650 }
651 requestQueue.post(request);
652 awaitingAck = true;
653 }else{
654 requests.add(request);
655 }
656 }
657
658 /**
659 * Schedules the given request for transmission to the ToolBus. If there are currently no
660 * jobs in the thread local queue, the event will be send immediately; otherwise we'll need
661 * to wait till all the jobs (associated with the current thread) that were previously
662 * scheduled have been submitted to the request queue.
663 *
664 * @param aTerm
665 * The term that hold the details about the request.
666 * @param threadId
667 * The id of the thread associated with the request.
668 * @return The received response on the issued request.
669 */
670 public synchronized Job postRequest(ATerm aTerm, long threadId){
671 Job job = new Job(REQUEST, aTerm, threadId);
672 if(!awaitingAck){
673 String sourceName = ((ATermAppl) aTerm).getAFun().getName();
674
675 JobQueue requestQueue;
676 synchronized(queues){
677 requestQueue = queues.get(sourceName);
678 if(requestQueue == null){
679 requestQueue = new JobQueue();
680 queues.put(sourceName, requestQueue);
681 }
682 }
683 requestQueue.post(job);
684 awaitingAck = true;
685 }else{
686 requests.add(job);
687 }
688 return job;
689 }
690
691 public ATerm waitForResponse(Job job){
692 synchronized(job){
693 while(job.response == null){
694 try{
695 job.wait();
696 }catch(InterruptedException irex){
697 // Ignore this, since I don't want to know about it.
698 }
699 }
700 }
701 return job.response;
702 }
703
704 /**
705 * Returns the next job in the queue.
706 *
707 * @return The next job in the queue; null if the queue is empty.
708 */
709 public synchronized Job getNext(){
710 if(!requests.isEmpty()) return requests.remove(0);
711
712 return null;
713 }
714
715 /**
716 * Acknowledges the last job that was send from the source the current thread is associated
717 * with. It will submit the next job in the queue if there are any.
718 */
719 public synchronized void acknowledge(){
720 Job next = getNext();
721 if(next == null){
722 awaitingAck = false;
723 }else{
724 ATerm term = next.term;
725 String sourceName = ((ATermAppl) term).getAFun().getName();
726
727 JobQueue requestQueue;
728 synchronized(queues){
729 requestQueue = queues.get(sourceName);
730 if(requestQueue == null){
731 requestQueue = new JobQueue();
732 queues.put(sourceName, requestQueue);
733 }
734 }
735
736 requestQueue.post(next);
737 }
738 }
739 }
740
741 /**
742 * The queue that is meant to take care of the asynchroneous execution and queueing of anything
743 * that invokes stuff on a tool.
744 *
745 * @author Arnold Lankamp
746 */
747 private static class WorkerQueue{
748 private final Object lock = new Object();
749
750 private final Queue<Runnable> queue;
751 private final Worker worker;
752
753 /**
754 * Default constructor.
755 */
756 public WorkerQueue(){
757 super();
758
759 this.queue = new LinkedList<Runnable>();
760 this.worker = new Worker();
761 worker.setDaemon(true);
762 }
763
764 /**
765 * Starts the worker thread.
766 */
767 public void start(){
768 worker.start();
769 }
770
771 /**
772 * Executes or queues the given runnable for execution.
773 *
774 * @param r
775 * The runnable containing the stuff that needs to be executed.
776 */
777 public void execute(Runnable r){
778 synchronized(lock){
779 boolean notBusy = queue.isEmpty();
780 queue.offer(r);
781 if(notBusy) lock.notify();
782 }
783 }
784
785 /**
786 * Terminates the worker thread as soon as is gracefully possible.
787 */
788 public void terminate(){
789 worker.terminate();
790 }
791
792 /**
793 * The worker thread of this queue.
794 *
795 * @author Arnold Lankamp
796 */
797 private class Worker extends Thread{
798 private volatile boolean running;
799
800 /**
801 * Default constructor.
802 */
803 public Worker(){
804 super();
805
806 running = true;
807 }
808
809 /**
810 * The main execution loop.
811 */
812 public void run(){
813 do{
814 Runnable r;
815 synchronized(lock){
816 r = queue.poll();
817 while(r == null){
818 try{
819 lock.wait();
820 }catch(InterruptedException irex){
821 // Ignore this.
822 }
823 if(!running) return;
824
825 r = queue.poll();
826 }
827 }
828
829 r.run();
830 }while(running);
831 }
832
833 /**
834 * Terminates this worker thread after the current iteration.
835 */
836 public void terminate(){
837 running = false;
838 synchronized(lock){
839 lock.notify();
840 }
841 }
842 }
843 }
844
845 /**
846 * Gathers performance statistics about this tool, like memory usage and the user-/system-time
847 * spend per thread.
848 *
849 * @return performance statictics.
850 */
851 private ATerm getPerformanceStats(){
852 // Type stuff
853 ATerm remote = factory.makeAppl(factory.makeAFun("legacy", 0, true));
854 ATerm toolType = factory.makeAppl(factory.makeAFun("type", 1, false), remote);
855
856 ATerm java = factory.makeAppl(factory.makeAFun("Java", 0, true));
857 ATerm toolLanguage = factory.makeAppl(factory.makeAFun("language", 1, false), java);
858
859 ATerm tool = factory.makeAppl(factory.makeAFun("tool", 2, false), toolType, toolLanguage);
860
861 // Memory stuff
862 MemoryMXBean mmxb = ManagementFactory.getMemoryMXBean();
863 long heapMemoryUsage = mmxb.getHeapMemoryUsage().getUsed();
864 long nonHeapMemoryUsage = mmxb.getNonHeapMemoryUsage().getUsed();
865
866 ATerm heapUsage = factory.makeAppl(factory.makeAFun("heap-usage", 1, false), factory.makeInt(((int) (heapMemoryUsage / 1024))));
867 ATerm nonHeapUsage = factory.makeAppl(factory.makeAFun("non-heap-usage", 1, false), factory.makeInt(((int) (nonHeapMemoryUsage / 1024))));
868
869 ATerm memory = factory.makeAppl(factory.makeAFun("memory-usage", 2, false), heapUsage, nonHeapUsage);
870
871 // Thread stuff
872 ThreadGroup currentThreadGroup = Thread.currentThread().getThreadGroup();
873 Thread[] relevantThreads = new Thread[currentThreadGroup.activeCount() * 2 + 10]; // Create an array that's more then big enough.
874 currentThreadGroup.enumerate(relevantThreads);
875
876 Set<Long> relevantThreadIds = new HashSet<Long>();
877 for(int i = 0; i < relevantThreads.length; i++){
878 Thread thread = relevantThreads[i];
879 if(thread != null){
880 relevantThreadIds.add(new Long(thread.getId()));
881 }
882 }
883
884 ThreadMXBean tmxb = ManagementFactory.getThreadMXBean();
885
886 ATerm threads;
887
888 long[] threadIds = tmxb.getAllThreadIds();
889 int nrOfThreads = threadIds.length;
890 try{
891 ATermList threadsList = factory.makeList();
892 for(int i = 0; i < nrOfThreads; i++){
893 long threadId = threadIds[i];
894 if(relevantThreadIds.contains(new Long(threadId))){ // Only list the info if we're interested in it.
895 ThreadInfo ti = tmxb.getThreadInfo(threadId);
896 if(ti != null){
897 String threadName = ti.getThreadName();
898 long userTime = tmxb.getThreadUserTime(threadIds[i]);
899 long systemTime = tmxb.getThreadCpuTime(threadIds[i]) - userTime;
900
901 if((userTime + systemTime) <= 0) continue;
902
903 ATerm userTimeTerm = factory.makeAppl(factory.makeAFun("user-time", 1, false), factory.makeInt(((int) (userTime / 1000000))));
904 ATerm systemTimeTerm = factory.makeAppl(factory.makeAFun("system-time", 1, false), factory.makeInt(((int) (systemTime / 1000000))));
905 ATerm thread = factory.makeAppl(factory.makeAFun(threadName, 2, false), userTimeTerm, systemTimeTerm);
906
907 threadsList = factory.makeList(thread, threadsList);
908 }
909 }
910 }
911
912 threads = factory.makeAppl(factory.makeAFun("threads", 1, false), threadsList);
913 }catch(UnsupportedOperationException uoex){
914 threads = factory.make("threads(unsupported-operation)");
915 System.out.println("Thread time profiling is not supported by this JVM.");
916 }
917
918 return factory.makeAppl(factory.makeAFun("performance-stats", 3, false), tool, memory, threads);
919 }
920 }