001 package com.hammurapi.eventbus;
002
003 import java.lang.ref.Reference;
004 import java.lang.ref.WeakReference;
005 import java.util.Arrays;
006 import java.util.Collection;
007 import java.util.Collections;
008 import java.util.Date;
009 import java.util.HashSet;
010 import java.util.List;
011 import java.util.Map;
012 import java.util.Queue;
013 import java.util.Set;
014 import java.util.concurrent.ExecutorService;
015 import java.util.concurrent.TimeUnit;
016 import java.util.concurrent.atomic.AtomicLong;
017 import java.util.concurrent.locks.Lock;
018 import java.util.logging.Level;
019 import java.util.logging.Logger;
020
021 import com.hammurapi.common.ExceptionHandler;
022 import com.hammurapi.common.Util;
023 import com.hammurapi.common.concurrent.TrackingExecutorService;
024 import com.hammurapi.eventbus.Matcher.HandlerManager;
025 import com.hammurapi.eventbus.monitoring.EventBusStats;
026 import com.hammurapi.eventbus.monitoring.Stats;
027 import com.hammurapi.eventbus.monitoring.StatsCollector;
028 import com.hammurapi.extract.ComparisonResult;
029 import com.hammurapi.extract.Extractor;
030 import com.hammurapi.extract.Predicate;
031 import com.hammurapi.store.Store;
032
033 /**
034 * Event bus dispatches events to event handlers.
035 * @author Pavel Vlasov.
036 * @param <E> Event type.
037 * @param <P> Priority type.
038 * @param <C> Context type.
039 * @param <K> Handler registration key.
040 */
041 public abstract class AbstractEventBus<E, P extends Comparable<P>, C, K, H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E,P,C,H,S>> implements EventBus<E, P, C, K, H, S> {
042 protected static final Logger logger = Logger.getLogger(AbstractEventBus.class.getName());
043
044 /**
045 * Interface to output bus structure for troubleshooting.
046 * @author Pavel Vlasov
047 * @param <K> Registration key type.
048 */
049 public interface Snapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> {
050
051 /**
052 * Invoked before any other methods.
053 */
054 void start();
055
056 /**
057 * Invoked at the end of taking snapshot
058 * @param success True if there've been no exceptions.
059 */
060 void end(boolean success);
061
062 void handler(K id, EventHandler<E, P, C, H, S> eventHandler);
063
064 void predicateNode(K id, Predicate<E, C> predicate, Collection<K> trueChildren, Collection<K> trueHandlers, Collection<K> falseChildren, Collection<K> falseHandlers, boolean isRoot);
065
066 void joinInput(K id, K joinNodeId, int index);
067
068 /**
069 *
070 * @param id Join node ID.
071 * @param predicates Node predicates.
072 * @param outputIndices Node output indices.
073 * @param eventHandlerId Handler ID if this node is a final join node.
074 * @param nextJoinNodeId Next join node id for intermediary nodes.
075 */
076 void joinNode(
077 K id,
078 Predicate<E, C> predicate,
079 Set<Integer> outputIndices,
080 K eventHandlerId,
081 K nextJoinNodeId);
082
083 }
084
085 /**
086 * This snapshot also outputs bus state.
087 * @author Pavel Vlasov
088 *
089 * @param <E>
090 * @param <P>
091 * @param <C>
092 * @param <K>
093 * @param <H>
094 * @param <S>
095 */
096 public interface StateSnapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> extends Snapshot<E, P, C, K, H, S> {
097
098 void event(K id, E event, boolean directPost);
099
100 void derivation(K eventId, K eventHandlerId, List<K> inputs);
101
102 void joinInputCollector(K joinNodeId, int[] indices, Collection<K[]> elements);
103
104 }
105
106
107 private Class<E> eventType;
108
109 private InferencePolicy inferencePolicy;
110
111 private TimeUnit statsTimeUnit;
112
113 private StatsCollector statsCollector;
114
115 private class EventBusStatsImpl implements EventBusStats {
116
117 AtomicLong internalPosts = new AtomicLong();
118
119 AtomicLong posts = new AtomicLong();
120
121 AtomicLong postsBaseline = new AtomicLong();
122 AtomicLong conclusionsBaseline = new AtomicLong();
123
124 private Date start = new Date();
125
126 @Override
127 public String getName() {
128 return "Event Bus";
129 }
130
131 @Override
132 public void reset() {
133 internalPosts.set(0);
134 posts.set(0);
135 postsBaseline.set(0);
136 conclusionsBaseline.set(0);
137 start = new Date();
138 }
139
140 @Override
141 public Iterable<Stats> children() {
142 return Collections.emptyList();
143 }
144
145 @Override
146 public long getPosts() {
147 return posts.get();
148 }
149
150 @Override
151 public long getPostsDelta() {
152 long currentPosts = posts.get();
153 return currentPosts - postsBaseline.getAndSet(currentPosts);
154 }
155
156 @Override
157 public long getConclusions() {
158 return internalPosts.get();
159 }
160
161 @Override
162 public long getConclusionsDelta() {
163 long currentConclusions = internalPosts.get();
164 return currentConclusions - conclusionsBaseline.getAndSet(currentConclusions);
165 }
166
167 AtomicLong handlersFired = new AtomicLong();
168 AtomicLong handlersFiredBaseline = new AtomicLong();
169
170 @Override
171 public long getHandlersFired() {
172 return handlersFired.get();
173 }
174
175 @Override
176 public long getHandlersFiredDelta() {
177 long currentHandlersFired = handlersFired.get();
178 return currentHandlersFired - handlersFiredBaseline.getAndSet(currentHandlersFired);
179 }
180
181 @Override
182 public Date getStart() {
183 return start;
184 }
185
186 }
187
188 private EventBusStatsImpl stats;
189
190 private InferenceFilter<E, P, C, K, H, S> inferenceFilter;
191
192 private Matcher<E, P, C, K, H, S> matcher;
193
194 public AbstractEventBus(
195 Class<E> eventType,
196 S store,
197 InferencePolicy inferencePolicy,
198 InferenceFilter<E,P,C,K,H,S> inferenceFilter,
199 StatsCollector statsCollector,
200 TimeUnit statsTimeUnit,
201 Matcher<E,P,C,K,H,S> matcher) {
202 this.store = store;
203 this.eventType = eventType;
204 this.inferencePolicy = inferencePolicy;
205 this.statsCollector = statsCollector;
206 if (statsCollector!=null) {
207 stats = new EventBusStatsImpl();
208 statsCollector.add(stats);
209 }
210 this.statsTimeUnit = statsTimeUnit;
211 this.inferenceFilter = inferenceFilter;
212 this.matcher = matcher;
213 this.matcher.setEventBus(this);
214 }
215
216 @Override
217 public Class<E> getEventType() {
218 return eventType;
219 }
220
221 public InferencePolicy getInferencePolicy() {
222 return inferencePolicy;
223 }
224
225 /** Abstract methods **/
226
227
228 /**
229 * Instantiates master handle.
230 * @param event
231 * @return
232 */
233 protected abstract H newMasterHandle(PostCommand<E,P,C,K,H,S> postCommand);
234
235 protected class CreateMasterHandleResult {
236
237 private boolean isNew;
238 private H handle;
239
240 public boolean isNew() {
241 return isNew;
242 }
243 public H getHandle() {
244 return handle;
245 }
246
247 CreateMasterHandleResult(H handle, boolean isNew) {
248 super();
249 this.handle = handle;
250 this.isNew = isNew;
251 }
252
253
254 }
255
256
257 /**
258 * Returns "root" executor service to submit predicate evaluation and handler execution tasks to.
259 */
260 protected abstract TrackingExecutorService getExecutorService();
261
262 /**
263 * Creates a wrapper around the master executor service for task tracking purposes.
264 * @param master Master executor service
265 * @return
266 */
267 protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name);
268
269 /**
270 * Create a wrapper around the root executor service for task tracking purposes.
271 * @return executor service or null for the synchronous mode.
272 */
273 protected abstract TrackingExecutorService createExecutorService(boolean oneOff, String name);
274
275 /**
276 * Generates handler ID.
277 * @return
278 */
279 protected abstract K nextId();
280
281 /**
282 * Helper interface for snapshot taking.
283 * @author Pavel Vlasov
284 *
285 * @param <K>
286 */
287 public interface DerivationEx<K> {
288 K getHandlerId();
289 List<K> getInputIds();
290 }
291
292 @SuppressWarnings("unchecked")
293 protected void workingMemorySnapshot(StateSnapshot<E, P, C, K, H, S> snapshot) {
294 getStore().readLock().lock();
295 try {
296 for (H handle: getStore().getAll()) {
297 snapshot.event(handle.getId(), handle.getEvent(), handle.isDirectPost());
298 for (Derivation<E,P,C> d: handle.getDerivations()) {
299 snapshot.derivation(handle.getId(), ((DerivationEx<K>) d).getHandlerId(), ((DerivationEx<K>) d).getInputIds());
300 }
301 }
302 } finally {
303 getStore().readLock().unlock();
304 }
305 }
306
307 // --- End abstract methods ---
308
309
310 public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) {
311 try {
312 snapshot.start();
313
314 // Step 1 Working memory
315 if (snapshot instanceof StateSnapshot) {
316 workingMemorySnapshot((StateSnapshot<E, P, C, K, H, S>) snapshot);
317 }
318
319 matcher.takeSnapshot(snapshot);
320
321 snapshot.end(true);
322 } catch (Exception e) {
323 snapshot.end(false);
324 throw new DispatchNetworkException("Error taking snapshot: "+e, e);
325 }
326 }
327
328 private ExceptionHandler exceptionHandler;
329
330 /* (non-Javadoc)
331 * @see com.hammurapi.eventbus.EventBus#setExceptionHandler(com.hammurapi.eventbus.ExceptionHandler)
332 */
333 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
334 this.exceptionHandler = exceptionHandler;
335 }
336
337 /* (non-Javadoc)
338 * @see com.hammurapi.eventbus.EventBus#getExceptionHandler()
339 */
340 public ExceptionHandler getExceptionHandler() {
341 return exceptionHandler;
342 }
343
344 // Constructor which takes executor service and collection factory for joiner.
345 // Borrow joiner from
346
347 /* (non-Javadoc)
348 * @see com.hammurapi.eventbus.EventBus#add(com.hammurapi.eventbus.EventHandler, C, boolean, com.hammurapi.extract.Predicate)
349 */
350 @Override
351 public K addHandler(final EventHandler<E, P, C, H, S> eventHandler) {
352 return matcher.addHandler(eventHandler);
353 }
354
355 // private JoinNode createJoinNode(PredicateNode pn,
356 // EventHandlerWrapper<E, P, C, K, H> handler) {
357 // // TODO Auto-generated method stub
358 // return null;
359 // }
360
361 /* (non-Javadoc)
362 * @see com.hammurapi.eventbus.EventBus#remove(K)
363 */
364 public void removeHandlers(Iterable<K> keys) {
365 matcher.removeHandlers(keys);
366 }
367
368 @Override
369 public void removeHandlers(K... keys) {
370 removeHandlers(Arrays.asList(keys));
371 }
372
373 protected abstract Lock getRtcLock();
374
375 /* (non-Javadoc)
376 * @see com.hammurapi.eventbus.EventBus#post(E)
377 */
378 @Override
379 public H post(final E event, Predicate<E, S>... validators) {
380 final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
381 if (fineIsLoggable) {
382 StringBuilder sb = new StringBuilder();
383 sb.append("Client post, event: "+event);
384 if (validators.length>0) {
385 sb.append(", validators: "+Arrays.toString(validators));
386 }
387 logger.fine(sb.toString());
388 }
389
390 if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) {
391 getRtcLock().lock();
392 }
393 try {
394 boolean exclusiveOrAfterRoot = InferencePolicy.AFTER_ROOT_EVENT.compareTo(inferencePolicy)<=0;
395 PostCommand<E,P,C,K,H,S> postCommand = new PostCommand<E,P,C,K,H,S>(event, true, null, null, null, createInferenceContext(), validators);
396 H ret = processInferenceCommand(postCommand);
397 if (exclusiveOrAfterRoot) {
398 TrackingExecutorService hes = postCommand.getInferenceContext().getExecutorService();
399 if (hes!=null ) {
400 hes.join();
401 }
402 Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = postCommand.getInferenceContext().getInferenceCommandsQueue();
403 while (!inferenceCommandsQueue.isEmpty()) {
404 for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
405 processInferenceCommand(conclusion);
406 }
407 if (hes!=null) {
408 hes.join();
409 }
410 }
411 }
412 return ret;
413 } catch (Exception e) {
414 logger.log(Level.SEVERE, "Post error: "+e, e);
415 if (getExceptionHandler()!=null) {
416 getExceptionHandler().handleException(e);
417 }
418 return null;
419 // masterHandle.handleException(e);
420 } finally {
421 if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) {
422 getRtcLock().unlock();
423 }
424 }
425 }
426
427 /**
428 * @param command command to be processed
429 * @param executorService Executor service for asynchronous processing
430 * @param inferenceCommandsQueue collector of inference commands for further processing.
431 */
432 @SuppressWarnings("unchecked")
433 protected H processInferenceCommand(InferenceCommand<E, P, C, K, H, S> command) {
434 // Filter commands.
435 if (inferenceFilter!=null && !inferenceFilter.accept(command, this)) {
436 return null;
437 }
438 if (command instanceof PostCommand) {
439 PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) command;
440 if (stats!=null) {
441 if (postCommand.isDirectPost()) {
442 stats.posts.incrementAndGet();
443 } else {
444 stats.internalPosts.incrementAndGet();
445 }
446 }
447
448 try {
449 Handle<E,P,C,K> handle = postCommand.isHandleMode() ? postCommand.getHandle() : createMasterHandle(postCommand);
450 if (postCommand.isDirectPost()) {
451 postCommand.getInferenceContext().setRootHandle(handle);
452 }
453 Iterable<EventHandlerWrapper<E, P, C, K, H, S>> handlers = matcher.match(handle.getEvent(), postCommand.getInferenceContext().getExecutorService());
454
455 Set<P> consumingPriorities = new HashSet<P>();
456 for (EventHandlerWrapper<E, P, C, K, H, S> ehw: handlers) {
457 if (ehw.consumes()) {
458 consumingPriorities.add(ehw.getPriority());
459 }
460 }
461 P prevPriority = null;
462 final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
463 InferenceContext<E, P, C, K, H, S> inferenceContext = postCommand.getInferenceContext().createNext();
464 TrackingExecutorService es = inferenceContext.getExecutorService();
465 for (EventHandlerWrapper<E, P, C, K, H, S> handler: handlers) {
466 // Wait for higher priority handlers to finish.
467 if (es!=null && prevPriority!=null && consumingPriorities.contains(prevPriority) && !handler.getPriority().equals(prevPriority)) {
468 es.join();
469 if (!handle.isValid()) {
470 if (fineIsLoggable) {
471 logger.fine("Event was consumed: "+handle.getEvent());
472 }
473
474 break; // Event has been consumed.
475 }
476 }
477
478 if (fineIsLoggable) {
479 logger.fine("Posting handler to execution: "+handler+" to process event "+handle.getEvent());
480 }
481
482 if (stats!=null) {
483 stats.handlersFired.incrementAndGet();
484 }
485 if (es==null) {
486 handler.post(null, inferenceContext, handle);
487 } else {
488 es.execute(new HandlerTask<E,P,C,K,H,S>(handle, handler, inferenceContext));
489 }
490 prevPriority = handler.getPriority();
491 }
492
493 if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) {
494 if (es!=null) {
495 es.join();
496 }
497 Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue();
498 for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
499 processInferenceCommand(conclusion);
500 }
501 }
502 return (H) handle;
503 } catch (InterruptedException e) {
504 throw new EventDispatchException(e);
505 }
506 }
507
508 if (command instanceof RemoveCommand) {
509 processRemoveCommand((RemoveCommand<E,P,C,K,H,S>) command);
510 return null;
511 }
512
513 if (command instanceof RetractCommand) {
514 final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
515
516 if (fineIsLoggable) {
517 logger.fine("Posting retract command to execution: "+command);
518 }
519
520 if (stats!=null) {
521 stats.handlersFired.incrementAndGet();
522 }
523
524 InferenceContext<E, P, C, K, H, S> inferenceContext = command.getInferenceContext().createNext();
525 TrackingExecutorService es = inferenceContext.getExecutorService();
526
527 if (es==null) {
528 RetractTask.processCommand((RetractCommand<E,P,C,K,H,S>) command, inferenceContext);
529 } else {
530 es.execute(new RetractTask<E,P,C,K,H,S>((RetractCommand<E,P,C,K,H,S>) command, inferenceContext));
531 }
532
533 if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) {
534 if (es!=null) {
535 try {
536 es.join();
537 } catch (InterruptedException e) {
538 throw new EventDispatchException(e);
539 }
540 }
541 Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue();
542 for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
543 processInferenceCommand(conclusion);
544 }
545 }
546
547 return null;
548 }
549
550 throw new IllegalArgumentException("Unexpected command: "+command);
551 }
552
553 // A trick to avoid exposure of local handle methods.
554 protected abstract void processRemoveCommand(RemoveCommand<E,P,C,K,H,S> command);
555
556 /**
557 * Creates and returns new master handle.
558 * re-dispatch existing event.
559 * @param event Event to add to the store.
560 * @param joinDelegate
561 * @param directPost
562 * @param validators
563 * @param derivation
564 * @return Master handle for new event, null for existing.
565 */
566 protected H createMasterHandle(PostCommand<E,P,C,K,H,S> postCommand) {
567 getStore().writeLock().lock();
568 try {
569 H handle = null;
570 if (getStore().getPrimaryKeyExtractor()==null) {
571 for (H eh: getStore()) {
572 if (eh.isValid() && postCommand.getEvent().equals(eh.getEvent())) {
573 handle = eh;
574 break;
575 }
576 }
577 } else {
578 handle = getStore().getByPrimaryKey(postCommand.getEvent());
579 }
580
581 if (handle!=null && !postCommand.isDirectPost()) {
582 handle.addDerivation(postCommand.getHandlerId(), postCommand.getHandler(), postCommand.getInputs());
583 return handle;
584 }
585
586 handle = newMasterHandle(postCommand);
587 Predicate<H, S>[] va = new Predicate[postCommand.getValidators().length];
588 final Reference<E> eventReference = new WeakReference<E>(postCommand.getEvent());
589 for (int i=0; i<postCommand.getValidators().length; ++i) {
590 final Predicate<E, S> validator = postCommand.getValidators()[i];
591
592 va[i] = new Predicate<H, S>() {
593
594 @Override
595 public Boolean extract(
596 S context,
597 Map<S, Map<Extractor<H, ? super Boolean, S>, ? super Boolean>> cache,
598 H... obj) {
599 E event = eventReference.get();
600 if (event==null) {
601 return false;
602 }
603 return validator.extract(context, null, Util.wrap(event));
604 }
605
606 @Override
607 public Set<Integer> parameterIndices() {
608 return validator.parameterIndices();
609 }
610
611 @Override
612 public boolean isContextDependent() {
613 return validator.isContextDependent();
614 }
615
616 @Override
617 public ComparisonResult compareTo(Extractor<H, Boolean, S> other) {
618 return ComparisonResult.NOT_EQUAL_NM; // Doesn't matter.
619 }
620
621 @Override
622 public double getCost() {
623 return validator.getCost();
624 }
625
626 };
627 }
628 Store.Handle<H, E, S> sh = getStore().put(handle, va);
629 handle.setStoreHandle(sh);
630 return handle;
631 } finally {
632 getStore().writeLock().unlock();
633 }
634 }
635
636 /* (non-Javadoc)
637 * @see com.hammurapi.eventbus.EventBus#reset()
638 */
639 public void reset() {
640 getStore().clear();
641 matcher.reset();
642 }
643
644 // public void rebuildDispatchNetwork() {
645 // getBusLock().writeLock().lock();
646 // try {
647 // rootNode.rebuild();
648 // } finally {
649 // getBusLock().writeLock().unlock();
650 // }
651 // }
652
653 /**
654 * Store entry complements store handle to provide functionality of event handle.
655 * @author Pavel Vlasov
656 *
657 */
658 public interface StoreEntry<E,P extends Comparable<P>,C,K> {
659
660 /**
661 * @return Derivations of this event.
662 */
663 Collection<Derivation<E,P,C>> getDerivations();
664
665 boolean isDerivedFrom(E event);
666
667 /**
668 * Sets new event. Invoked by substitute().
669 * For internal use, shall not be invoked by client code.
670 * @param event
671 */
672 void setEvent(E event);
673
674 /**
675 * Invalidates this handle, creates a new handle
676 * for updated event, posts updated event to the bus,
677 * For internal use, shall not be invoked by client code.
678 * @return New handle for updated event.
679 */
680 Handle<E,P,C,K> update();
681
682 /**
683 * @return Event id.
684 */
685 K getId();
686 }
687
688 /**
689 * Event handle.
690 * @author Pavel Vlasov
691 *
692 */
693 public interface Handle<E,P extends Comparable<P>,C,K> extends EventBus.Handle<E, P, C>, ExceptionHandler {
694
695 // /**
696 // * Sets new event. Invoked by substitute().
697 // * For internal use, shall not be invoked by client code.
698 // * @param event
699 // */
700 // void setEvent(E event);
701
702 /**
703 * For internal use, shall not be invoked by client code.
704 * @return New handle for updated event.
705 */
706 void update();
707
708 K getId();
709
710 /**
711 * Callback method. For internal use.
712 * @param storeHandle
713 */
714 <H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E, P, C, H, S>> void setStoreHandle(Store.Handle<H, E, S> storeHandle);
715
716 /**
717 * For internal use.
718 * @param derivation
719 */
720 void addDerivation(K handlerId, EventHandler<E, P, C, ?, ?> handler, Handle<E,P,C,K>[] inputs);
721
722 /**
723 * @return true if this handle's event was posted by client code.
724 */
725 boolean isDirectPost();
726
727 /**
728 * Sets direct post to true. For internal use.
729 */
730 void setDirectPost();
731 }
732
733 /* (non-Javadoc)
734 * @see com.hammurapi.eventbus.EventBus#getDerivations(E)
735 */
736 public Collection<Derivation<E,P,C>> getDerivations(E event) {
737 H handle = null;
738 if (getStore().getPrimaryKeyExtractor()==null) {
739 for (H sh: getStore()) {
740 if (event.equals(sh.getEvent())) {
741 handle = sh;
742 break;
743 }
744 }
745 } else {
746 handle = getStore().getByPrimaryKey(event);
747 }
748
749 if (handle==null) {
750 return Collections.emptyList();
751 }
752 return handle.getDerivations();
753 }
754
755 @Override
756 public void join() throws InterruptedException {
757 if (getExecutorService()!=null) {
758 getExecutorService().join();
759 }
760 }
761
762 public boolean join(long timeout) throws InterruptedException {
763 return getExecutorService().join(timeout);
764 }
765
766 private S store;
767
768 @Override
769 public S getStore() {
770 return store;
771 }
772
773 @Override
774 public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) {
775 matcher.manageHandlers(handlerManager);
776 }
777
778 /**
779 * Creates inference context with zero chain length and no root handle.
780 * Inference queue is supplied with AFTER_EVENT, AFTER_ROOT_EVENT and EXCLUSIVE policies.
781 * Use this method to create initial/root inference context.
782 * @return
783 */
784 protected abstract InferenceContext<E,P,C,K,H,S> createInferenceContext();
785
786 }