001 /**
002 * @author paulk
003 */
004
005 package toolbus;
006
007 import java.io.FileNotFoundException;
008 import java.io.FileReader;
009 import java.io.IOException;
010 import java.io.PrintWriter;
011 import java.io.StringWriter;
012 import java.lang.management.ManagementFactory;
013 import java.lang.management.MemoryMXBean;
014 import java.lang.management.ThreadInfo;
015 import java.lang.management.ThreadMXBean;
016 import java.util.ArrayList;
017 import java.util.Collections;
018 import java.util.HashSet;
019 import java.util.Iterator;
020 import java.util.List;
021
022 import toolbus.atom.Atom;
023 import toolbus.commandline.CommandLine;
024 import toolbus.exceptions.NoSuchProcessDefinitionException;
025 import toolbus.exceptions.SyntaxErrorException;
026 import toolbus.exceptions.ToolBusError;
027 import toolbus.exceptions.ToolBusException;
028 import toolbus.logging.ILogger;
029 import toolbus.logging.IToolBusLoggerConstants;
030 import toolbus.logging.LoggerFactory;
031 import toolbus.matching.MatchStore;
032 import toolbus.parsercup.parser;
033 import toolbus.process.ProcessCall;
034 import toolbus.process.ProcessDefinition;
035 import toolbus.process.ProcessExpression;
036 import toolbus.process.ProcessInstance;
037 import toolbus.tool.ToolDefinition;
038 import toolbus.tool.ToolInstance;
039 import toolbus.tool.execution.DefaultToolExecutorFactory;
040 import toolbus.tool.execution.IToolExecutorFactory;
041 import toolbus.util.collections.ConcurrentHashMap;
042 import toolbus.util.collections.ConcurrentHashMap.ReadOnlyHashMapEntryHandler;
043 import aterm.ATerm;
044
045 /**
046 * ToolBus implements the behaviour of one ToolBus.
047 */
048 public class ToolBus{
049 // The amount of time we wait for tools to terminate when the toolbus's shutdown sequence is
050 // initiated.
051 private final static int SHUTDOWNTIMEOUT = 5000;
052
053 protected final PropertyManager propertyManager;
054
055 protected volatile int portNumber;
056
057 protected final TBTermFactory tbfactory;
058
059 protected final List<ProcessInstance> processes;
060 private int processIdCounter;
061 private final ConcurrentHashMap<String, ProcessDefinition> procdefs;
062
063 protected final MatchStore matchStore;
064
065 private final ConcurrentHashMap<String, ToolDefinition> tooldefs;
066 private final ToolInstanceManager toolInstanceManager;
067 private volatile IToolExecutorFactory toolExecutorFactory;
068
069 protected volatile SocketConnectionHandler connectionHandler;
070 private final DirectConnectionHandler directConnectionHandler;
071
072 private final PrintWriter out;
073
074 protected int nerrors = 0;
075
076 private long startTime;
077 private long nextTime = 0;
078
079 protected volatile boolean shuttingDown = false;
080
081 private volatile String[] scriptsNames = null; // For debugging only.
082
083 private final Object shutdownLock = new Object();
084
085 /**
086 * Constructor with explicit PrintWriter
087 *
088 * @throws ToolBusError
089 */
090 public ToolBus(String[] args, PrintWriter out){
091 super();
092
093 portNumber = -1; // Undefined.
094
095 toolInstanceManager = new ToolInstanceManager();
096 toolExecutorFactory = new DefaultToolExecutorFactory();
097
098 tbfactory = TBTermFactory.getInstance();
099 this.out = out;
100 processes = new ArrayList<ProcessInstance>();
101 processIdCounter = 0;
102 procdefs = new ConcurrentHashMap<String, ProcessDefinition>();
103 tooldefs = new ConcurrentHashMap<String, ToolDefinition>();
104
105 matchStore = new MatchStore(tbfactory);
106
107 propertyManager = new PropertyManager(args);
108
109 directConnectionHandler = new DirectConnectionHandler(this);
110
111 startTime = System.currentTimeMillis();
112 }
113
114 /**
115 * Constructor with implicit PrintWriter
116 *
117 * @throws ToolBusError
118 */
119 public ToolBus(String[] args){
120 this(args, new PrintWriter(System.out));
121 }
122
123 public MatchStore getMatchStore(){
124 return matchStore;
125 }
126
127
128 /**
129 * Constructor with explicit StringWriter
130 *
131 * @throws ToolBusError
132 */
133 public ToolBus(String[] args, StringWriter out){
134 this(args, new PrintWriter(out));
135 }
136
137 public ToolInstanceManager getToolInstanceManager(){
138 return toolInstanceManager;
139 }
140
141 public void setToolExecutorFactory(IToolExecutorFactory toolExecutorFactory){
142 this.toolExecutorFactory = toolExecutorFactory;
143 }
144
145 public IToolExecutorFactory getToolExecutorFactory(){
146 return toolExecutorFactory;
147 }
148
149 public DirectConnectionHandler getDirectConnectionHandler(){
150 return directConnectionHandler;
151 }
152
153 /**
154 * Getters and setters for global properties
155 */
156
157 public String getProperty(String p){
158 String r = propertyManager.get(p);
159 // System.err.println("get(" + p + ") => " + r);
160 return r;
161 }
162
163 public String getProperty(String p, String def){
164 String r = propertyManager.get(p, def);
165 // System.err.println("get(" + p + ", " + def + ") => " + r);
166 return r;
167 }
168
169 public void setProperty(String key, String val){
170 propertyManager.set(key, val);
171 }
172
173 /**
174 * Networking functions
175 */
176
177 static String getHostName(){
178 String hostname = "";
179 try{
180 hostname = java.net.InetAddress.getLocalHost().getHostName();
181 int dotPosition = hostname.indexOf('.');
182 if(dotPosition != -1){
183 hostname = hostname.substring(0, dotPosition);
184 }
185 }catch(java.net.UnknownHostException e){
186 // Ignore, unable to resolve hostname
187 }
188 return hostname;
189 }
190
191 public int getPort(){
192 return portNumber;
193 }
194
195 /**
196 * Get the ATermFactory used.
197 */
198 public TBTermFactory getTBTermFactory(){
199 return tbfactory;
200 }
201
202 /**
203 * Get the current list of processes.
204 */
205 public List<ProcessInstance> getProcesses(){
206 return processes;
207 }
208
209 /**
210 * Get current PrintWriter.
211 */
212 public PrintWriter getPrintWriter(){
213 return out;
214 }
215
216 public void error(String src, String msg){
217 System.err.println(src + ": " + msg);
218 nerrors++;
219 }
220
221 public void clearErrorsAndWarnings(){
222 nerrors = 0;
223 }
224
225 public long getRunTime(){
226 return System.currentTimeMillis() - startTime;
227 }
228
229 /**
230 * Notifies the ToolBus that an atom with a delay on it, which could not be executed, has just been touched.
231 *
232 * @param next
233 * The relative time in milliseconds that needs to elapse before the atom can be executed.
234 */
235 public void setNextTime(long next){
236 //System.err.println("setNextTime: " + next);
237 long currentTime = getRunTime();
238 if((next < nextTime && next > currentTime) || nextTime < currentTime){
239 nextTime = next;
240 }
241 //System.err.println("setNextTime: set to " + nextTime);
242 }
243
244 /**
245 * Parse a Tscript from file and add definitions to this ToolBus.
246 */
247 public void parsecup1(HashSet<String> includedFiles, List<ATerm> toolbusProcessCalls, String filename) throws ToolBusException{
248 try{
249 parser parser_obj = new parser(includedFiles, toolbusProcessCalls, filename, new FileReader(filename), this);
250 parser_obj.parse();
251 }catch(ToolBusException tbex){
252 error(filename, tbex.getMessage());
253 throw tbex;
254 }catch(SyntaxErrorException seex){
255 error(filename, seex.getMessage());
256 throw seex;
257 }catch(FileNotFoundException fnfex){
258 error(filename, fnfex.getMessage());
259 throw new ToolBusException(fnfex.getMessage(), fnfex);
260 }catch(Exception ex){
261 error(filename, ex.getMessage());
262 throw new ToolBusException(ex.getMessage(), ex);
263 }
264 }
265
266 public void parsecup() throws ToolBusException{
267 String filename = propertyManager.get("script.path");
268 if(filename == null) throw new RuntimeException("Script name undefined.");
269
270 try{
271 parser parser_obj = new parser(this, filename);
272 parser_obj.parse();
273
274 // Initialize the signatures.
275 final List<Atom> atomSignature = new ArrayList<Atom>();
276 procdefs.iterate(new ReadOnlyHashMapEntryHandler<String, ProcessDefinition>(){
277 public int handle(String key, ProcessDefinition value){
278 ProcessExpression originalProcessExpression = value.getOriginalProcessExpression();
279 AtomList atoms = originalProcessExpression.getAtoms();
280 Iterator<Atom> atomSetIterator = atoms.iterator();
281 while(atomSetIterator.hasNext()){
282 Atom a = atomSetIterator.next();
283 atomSignature.add(a);
284 }
285
286 return CONTINUE;
287 }
288 });
289
290 matchStore.initialize(atomSignature);
291
292 calculateToolSignatures(atomSignature);
293
294 parser_obj.generateInitialProcessCalls();
295
296 // Keep track of the names of all the scripts for debugging purposes.
297 scriptsNames = parser_obj.scriptsNames();
298 }catch(RuntimeException rex){
299 throw rex;
300 }catch(ToolBusException tbex){
301 error(filename, tbex.getMessage());
302 throw tbex;
303 }catch(FileNotFoundException fnfex){
304 error(filename, fnfex.getMessage());
305 throw new ToolBusException(fnfex.getMessage(), fnfex);
306 }catch(Exception ex){
307 error(filename, ex.getMessage());
308 throw new ToolBusException(ex.getMessage(), ex);
309 }
310 }
311
312 /**
313 * Add a process definition.
314 */
315 public void addProcessDefinition(ProcessDefinition PD) throws ToolBusError{
316 String name = PD.getName();
317 //System.err.println("addProcessDefinition: " + name);
318 if(procdefs.contains(name)) throw new ToolBusError("duplicate definition of process " + name);
319
320 procdefs.put(name, PD);
321 }
322
323 /**
324 * Add a tool definition.
325 */
326 public void addToolDefinition(ToolDefinition TD) throws ToolBusError{
327 String name = TD.getName();
328 if(tooldefs.contains(name)) throw new ToolBusError("duplicate definition of tool " + name);
329
330 tooldefs.put(name, TD);
331 }
332
333 /**
334 * Get a tool definition by name.
335 */
336 public ToolDefinition getToolDefinition(String name) throws ToolBusError{
337 ToolDefinition definition = tooldefs.get(name);
338 if(definition == null) throw new ToolBusError("No tool definition for tool " + name);
339
340 return definition;
341 }
342
343 public List<ToolDefinition> getToolDefinitions(){
344 final List<ToolDefinition> toolDefinitions = new ArrayList<ToolDefinition>();
345
346 tooldefs.iterate(new ReadOnlyHashMapEntryHandler<String, ToolDefinition>(){
347 public int handle(String key, ToolDefinition value){
348 toolDefinitions.add(value);
349
350 return CONTINUE;
351 }
352 });
353
354 return toolDefinitions;
355 }
356
357 private void calculateToolSignatures(final List<Atom> atomSignature){
358 tooldefs.iterate(new ReadOnlyHashMapEntryHandler<String, ToolDefinition>(){
359 public int handle(String key, ToolDefinition value){
360 value.calculateToolSignature(atomSignature);
361
362 return CONTINUE;
363 }
364 });
365 }
366
367 /**
368 * Add a process (as ProcessCall); previous two will become obsolete.
369 */
370 public ProcessInstance addProcess(ProcessCall call) throws ToolBusException{
371 ProcessInstance pi;
372 synchronized(processes){
373 pi = new ProcessInstance(this, call, processIdCounter++);
374 processes.add(pi);
375 }
376 return pi;
377 }
378
379 public List<ProcessDefinition> getProcessDefinitions(){
380 final List<ProcessDefinition> processDefinitions = new ArrayList<ProcessDefinition>();
381
382 procdefs.iterate(new ReadOnlyHashMapEntryHandler<String, ProcessDefinition>(){
383 public int handle(String key, ProcessDefinition value){
384 processDefinitions.add(value);
385
386 return CONTINUE;
387 }
388 });
389
390 return processDefinitions;
391 }
392
393 /**
394 * Get a process definition by name.
395 *
396 * @param numberOfActuals
397 * TODO
398 */
399 public ProcessDefinition getProcessDefinition(String name, int numberOfActuals) throws ToolBusError{
400 ProcessDefinition definition = procdefs.get(name);
401 if(definition == null) throw new NoSuchProcessDefinitionException(name, numberOfActuals);
402
403 return definition;
404 }
405
406 public String[] getIncludedScripts(){
407 return scriptsNames;
408 }
409
410 // TODO Fix the concurrency problems in here.
411 /**
412 * Dumps the current state of all the processes in the ToolBus and the state of all the tools to stderr.
413 */
414 public void showStatus(){
415 if(workHasArrived == true || workHasArrived == false){ // Volatile read; triggers some cache coherency actions, so we see a reasonably up to date version of the data we're trying to dump (this variable is written to very often by the process logic thread; which is the one we want to sync with).
416 System.err.println("--- ToolBus status: " + processes.size() + " processes; " + toolInstanceManager.numberOfConnectedTools() + " tools ----");
417 Iterator<ProcessInstance> processesIterator = new ProcessInstanceIterator(processes);
418 while(processesIterator.hasNext()){
419 ProcessInstance P = processesIterator.next();
420 System.err.println(P.showStatus());
421 }
422 toolInstanceManager.showStatus();
423 System.err.println("------------------------");
424 }
425 }
426
427 private volatile boolean running = false;
428 private final Object processLock = new Object();
429 private volatile boolean workHasArrived = false;
430
431 /**
432 * Shutdown of this ToolBus.
433 */
434 public void shutdown(ATerm msg){
435 shuttingDown = true;
436 System.err.println("Shutting down ToolBus: " + msg);
437
438 Iterator<ProcessInstance> processIterator = new ProcessInstanceIterator(processes);
439 while(processIterator.hasNext()){
440 ProcessInstance pi = processIterator.next();
441 pi.terminate();
442 }
443
444 toolInstanceManager.shutDown(msg);
445
446 // Wait for the connected tools to terminate.
447 long startShutdownTime = System.currentTimeMillis();
448 long endShutdownTime = startShutdownTime + SHUTDOWNTIMEOUT;
449 do{
450 Thread.yield();
451
452 if(System.currentTimeMillis() > endShutdownTime){
453 LoggerFactory.log("Shutdown timeout expired while waiting for the termination of the connected tools.", ILogger.WARNING, IToolBusLoggerConstants.TOOLINSTANCE);
454
455 System.err.println("Tools that didn't shutdown gracefully: ");
456 toolInstanceManager.showStatus();
457
458 LoggerFactory.log("Killing executed tools.", ILogger.WARNING, IToolBusLoggerConstants.TOOLINSTANCE);
459 toolInstanceManager.killExecutedToolsNow();
460 break;
461 }
462 }while(toolInstanceManager.numberOfConnectedTools() > 0);
463
464 connectionHandler.stopRunning();
465
466 synchronized(processLock){
467 running = false;
468 processLock.notify();
469 }
470 }
471
472 /**
473 * Gathers performance statistics and writes them to the console.
474 */
475 public void dumpPerformanceStats(){
476 // Memory stuff
477 MemoryMXBean mmxb = ManagementFactory.getMemoryMXBean();
478 long heapMemoryUsage = mmxb.getHeapMemoryUsage().getUsed();
479 long nonHeapMemoryUsage = mmxb.getNonHeapMemoryUsage().getUsed();
480
481 System.err.println("Memory usage: heap = " + (heapMemoryUsage / 1024) + "KB, non-heap = " + (nonHeapMemoryUsage / 1024) + "KB");
482
483 // Thread stuff
484 ThreadMXBean tmxb = ManagementFactory.getThreadMXBean();
485
486 long[] threadIds = tmxb.getAllThreadIds();
487 int nrOfThreads = threadIds.length;
488 for(int i = 0; i < nrOfThreads; i++){
489 ThreadInfo ti = tmxb.getThreadInfo(threadIds[i]);
490 if(ti != null){
491 String threadName = ti.getThreadName();
492 long userTime = tmxb.getThreadUserTime(threadIds[i]);
493 long systemTime = tmxb.getThreadCpuTime(threadIds[i]) - userTime;
494
495 if((userTime + systemTime) > 0) System.err.println(threadName + " : user time = " + (userTime / 1000000) + ", system time = " + (systemTime / 1000000));
496 }
497 }
498 }
499
500 // TODO Fix the concurrency problems in here.
501 /**
502 * Prints all unhandled messages and queued notes to stderr.
503 */
504 public void dumpUnhandledMessages(){
505 if(workHasArrived == true || workHasArrived == false){ // Volatile read; triggers some cache coherency actions, so we see a reasonably up to date version of the data we're trying to dump (this variable is written to very often by the process logic thread; which is the one we want to sync with).
506 boolean anythingUnhandled = false;
507 ProcessInstanceIterator processInstanceIterator = new ProcessInstanceIterator(processes);
508 try{
509 while(processInstanceIterator.hasNext()){
510 ProcessInstance pi = processInstanceIterator.next();
511 List<ATerm> unhandledNotes = pi.getNoteQueue();
512 List<StateElement> unhandledMessage = pi.getCurrentState().getUnhandledMessages();
513
514 if((unhandledNotes.size() + unhandledMessage.size()) > 0){
515 anythingUnhandled = true;
516
517 System.err.println(pi.getProcessName()+"("+pi.getProcessId()+"):");
518
519 try{
520 System.err.println(unhandledNotes);
521 }catch(RuntimeException rex){
522 // Ignore this so we can go on.
523 }
524
525 try{
526 System.err.println(unhandledMessage);
527 }catch(RuntimeException rex){
528 // Ignore this so we can go on.
529 }
530 }
531 }
532 }catch(RuntimeException rex){
533 // This will probably never happen, but it's necessary none the less.
534 }
535
536 if(!anythingUnhandled){
537 System.err.println("No unhandled messages.");
538 }
539 }
540 }
541
542 public void prepare(){
543 if(nerrors > 0){
544 System.err.println("ToolBus cannot continue execution due to errors in Tscript");
545 return;
546 }
547
548 if(propertyManager.withConsole()) CommandLine.createCommandLine(this, System.in, false);
549
550 // Initialize and start the connection handler.
551 connectionHandler = new SocketConnectionHandler(this);
552
553 try{
554 int userSpecifiedPort = propertyManager.getUserSpecifiedPort();
555 if(userSpecifiedPort == -1){
556 connectionHandler.initialize();
557 }else{
558 connectionHandler.initialize(userSpecifiedPort);
559 }
560 }catch(IOException ioex){
561 LoggerFactory.log("Unable initialize the ToolBus connection handler.", ioex, ILogger.FATAL, IToolBusLoggerConstants.COMMUNICATION);
562 throw new RuntimeException(ioex);
563 }
564 portNumber = connectionHandler.getPort();
565
566 Thread tbConnectionHandler = new Thread(connectionHandler);
567 tbConnectionHandler.setName("ToolBus connection handler");
568 tbConnectionHandler.start();
569
570 System.out.println("The ToolBus server allocated port ("+portNumber+")");
571 }
572
573 // TODO Replace the process logic loop stuff by a full blown scheduler. This code is rather ugly and fragile.
574 /**
575 * The ToolBus's main process loop.
576 * This method handles the execution of the process logic.
577 */
578 public void execute(){
579 ProcessInstance pi = null;
580 ProcessInstanceIterator processesIterator = new ProcessInstanceIterator(processes);
581 running = true;
582 try{
583 PROCESSLOOP: do{
584 long currentNextTime = nextTime;
585 boolean work;
586 do{
587 workHasArrived = false;
588 work = false;
589
590 while(processesIterator.hasNext()){
591 if(shuttingDown) break PROCESSLOOP;
592
593 pi = processesIterator.next();
594 work |= pi.step();
595 if(pi.isTerminated()){
596 processesIterator.remove();
597 pi.terminate();
598
599 // Shut down the ToolBus when there are no more running processes left.
600 if(processes.size() == 0) break PROCESSLOOP;
601 }
602 }
603
604 Collections.rotate(processes, 1);
605
606 processesIterator.reset();
607 }while(work);
608
609 synchronized(processLock){
610 // If we got nothing left to do and we are still running, wait for work to
611 // arrive.
612 while(!workHasArrived && running){
613 try{
614 long blockTime = nextTime - getRunTime(); // Recalculate the delay before sleeping.
615 if(blockTime > 0){
616 processLock.wait(blockTime);
617 workHasArrived = true;
618 }else if(currentNextTime != nextTime){ // If the nextTime changed and the blockTime is zero or less, don't block as there might be work to do.
619 workHasArrived = true;
620 }else{
621 processLock.wait();
622 }
623 }catch(InterruptedException irex){
624 // Just ignore this, it's not harmfull.
625 }
626 }
627 }
628 }while(running);
629 }catch(ToolBusException ex){
630 error("ToolBus exception occured in process " + (pi != null ? pi.getProcessName() : "?"), ex.getMessage());
631 ex.printStackTrace();
632 }catch(RuntimeException rex){
633 error("ToolBus exception occured in process " + (pi != null ? pi.getProcessName() : "?"), rex.getMessage());
634 rex.printStackTrace();
635 }
636
637 // If the ToolBus is still running, shut it down.
638 if(!shuttingDown) shutdown(tbfactory.make("ToolBus halted"));
639
640 synchronized(shutdownLock){
641 shutdownLock.notifyAll();
642 }
643 }
644
645 /**
646 * Notifies the ToolBus that work has arrived.
647 * This will wake the ToolBus's process loop up if it's currently sleeping.
648 *
649 * @param toolInstance
650 * The tool instance associated with the tool that send us data.
651 * @param operation
652 * The operation associated with the package that arrived.
653 */
654 public void workArrived(ToolInstance toolInstance, byte operation){
655 // Only notify when needed.
656 if(!workHasArrived){
657 workHasArrived = true;
658 synchronized(processLock){
659 processLock.notify();
660 }
661 }
662 }
663
664 public void waitTillShutdown(){
665 synchronized(shutdownLock){
666 while(!shuttingDown && running){
667 try{
668 shutdownLock.wait();
669 }catch(InterruptedException irex){
670 // Ignore this exception.
671 }
672 }
673 }
674 }
675
676 /**
677 * Custom iterator class.
678 * This is souly needed, so we aren't being bothered by the 'fail-fast eventhough it's completely pointless, since it's the currently iterating thread that modified something and not a concurrently running one, in which case I'm probably unable to detect it anyway' behaviour.
679 *
680 * @author Arnold Lankamp
681 */
682 protected static class ProcessInstanceIterator implements Iterator<ProcessInstance>{
683 private final List<ProcessInstance> list;
684 private int index;
685
686 public ProcessInstanceIterator(List<ProcessInstance> list){
687 this.list = list;
688 this.index = 0;
689 }
690
691 public boolean hasNext(){
692 synchronized(list){
693 return (list.size() > index);
694 }
695 }
696
697 public ProcessInstance next(){
698 synchronized(list){
699 return list.get(index++);
700 }
701 }
702
703 // You can only call this once per iteration!
704 public void remove(){
705 synchronized(list){
706 list.remove(--index);
707 }
708 }
709
710 public void reset(){
711 index = 0;
712 }
713 }
714 }