001    package com.hammurapi.eventbus;
002    
003    import java.lang.ref.Reference;
004    import java.lang.ref.WeakReference;
005    import java.util.Arrays;
006    import java.util.Collection;
007    import java.util.Collections;
008    import java.util.Date;
009    import java.util.HashSet;
010    import java.util.List;
011    import java.util.Map;
012    import java.util.Queue;
013    import java.util.Set;
014    import java.util.concurrent.ExecutorService;
015    import java.util.concurrent.TimeUnit;
016    import java.util.concurrent.atomic.AtomicLong;
017    import java.util.concurrent.locks.Lock;
018    import java.util.logging.Level;
019    import java.util.logging.Logger;
020    
021    import com.hammurapi.common.ExceptionHandler;
022    import com.hammurapi.common.Util;
023    import com.hammurapi.common.concurrent.TrackingExecutorService;
024    import com.hammurapi.eventbus.Matcher.HandlerManager;
025    import com.hammurapi.eventbus.monitoring.EventBusStats;
026    import com.hammurapi.eventbus.monitoring.Stats;
027    import com.hammurapi.eventbus.monitoring.StatsCollector;
028    import com.hammurapi.extract.ComparisonResult;
029    import com.hammurapi.extract.Extractor;
030    import com.hammurapi.extract.Predicate;
031    import com.hammurapi.store.Store;
032    
033    /**
034     * Event bus dispatches events to event handlers.
035     * @author Pavel Vlasov.
036     * @param <E> Event type.
037     * @param <P> Priority type.
038     * @param <C> Context type.
039     * @param <K> Handler registration key.
040     */
041    public abstract class AbstractEventBus<E, P extends Comparable<P>, C, K, H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E,P,C,H,S>> implements EventBus<E, P, C, K, H, S> {
042            protected static final Logger logger = Logger.getLogger(AbstractEventBus.class.getName());
043                    
044            /**
045             * Interface to output bus structure for troubleshooting.
046             * @author Pavel Vlasov
047             * @param <K> Registration key type.
048             */
049            public interface Snapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> {
050                    
051                    /**
052                     * Invoked before any other methods.
053                     */
054                    void start();
055                    
056                    /**
057                     * Invoked at the end of taking snapshot
058                     * @param success True if there've been no exceptions. 
059                     */
060                    void end(boolean success);
061                    
062                    void handler(K id, EventHandler<E, P, C, H, S> eventHandler);
063                    
064                    void predicateNode(K id, Predicate<E, C> predicate, Collection<K> trueChildren, Collection<K> trueHandlers, Collection<K> falseChildren, Collection<K> falseHandlers, boolean isRoot);
065                    
066                    void joinInput(K id, K joinNodeId, int index);
067                    
068                    /**
069                     * 
070                     * @param id Join node ID.
071                     * @param predicates Node predicates.
072                     * @param outputIndices Node output indices.
073                     * @param eventHandlerId Handler ID if this node is a final join node.
074                     * @param nextJoinNodeId Next join node id for intermediary nodes.
075                     */
076                    void joinNode(
077                                    K id, 
078                                    Predicate<E, C> predicate, 
079                                    Set<Integer> outputIndices, 
080                                    K eventHandlerId, 
081                                    K nextJoinNodeId);
082                    
083            }
084            
085            /**
086             * This snapshot also outputs bus state.
087             * @author Pavel Vlasov
088             *
089             * @param <E>
090             * @param <P>
091             * @param <C>
092             * @param <K>
093             * @param <H>
094             * @param <S>
095             */
096            public interface StateSnapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> extends Snapshot<E, P, C, K, H, S> {
097                    
098                    void event(K id, E event, boolean directPost);
099                    
100                    void derivation(K eventId, K eventHandlerId, List<K> inputs);
101                    
102                    void joinInputCollector(K joinNodeId, int[] indices, Collection<K[]> elements);
103                    
104            }
105            
106    
107            private Class<E> eventType;
108    
109            private InferencePolicy inferencePolicy;
110    
111            private TimeUnit statsTimeUnit;
112    
113            private StatsCollector statsCollector;
114            
115            private class EventBusStatsImpl implements EventBusStats {
116                    
117                    AtomicLong internalPosts = new AtomicLong();
118                    
119                    AtomicLong posts = new AtomicLong();
120                    
121                    AtomicLong postsBaseline = new AtomicLong();            
122                    AtomicLong conclusionsBaseline = new AtomicLong();      
123                    
124                    private Date start = new Date();
125    
126                    @Override
127                    public String getName() {
128                            return "Event Bus";
129                    }
130    
131                    @Override
132                    public void reset() {
133                            internalPosts.set(0);
134                            posts.set(0);
135                            postsBaseline.set(0);
136                            conclusionsBaseline.set(0);
137                            start = new Date();
138                    }
139    
140                    @Override
141                    public Iterable<Stats> children() {
142                            return Collections.emptyList();
143                    }
144    
145                    @Override
146                    public long getPosts() {
147                            return posts.get();
148                    }
149    
150                    @Override
151                    public long getPostsDelta() {
152                            long currentPosts = posts.get();
153                            return currentPosts - postsBaseline.getAndSet(currentPosts);
154                    }
155    
156                    @Override
157                    public long getConclusions() {
158                            return internalPosts.get();
159                    }
160    
161                    @Override
162                    public long getConclusionsDelta() {
163                            long currentConclusions = internalPosts.get();
164                            return currentConclusions - conclusionsBaseline.getAndSet(currentConclusions);
165                    }
166                    
167                    AtomicLong handlersFired = new AtomicLong();
168                    AtomicLong handlersFiredBaseline = new AtomicLong();
169                    
170                    @Override
171                    public long getHandlersFired() {
172                            return handlersFired.get();
173                    }
174                    
175                    @Override
176                    public long getHandlersFiredDelta() {
177                            long currentHandlersFired = handlersFired.get();
178                            return currentHandlersFired - handlersFiredBaseline.getAndSet(currentHandlersFired);
179                    }
180    
181                    @Override
182                    public Date getStart() {
183                            return start;
184                    }
185                    
186            }
187            
188            private EventBusStatsImpl stats;
189    
190            private InferenceFilter<E, P, C, K, H, S> inferenceFilter;
191    
192            private Matcher<E, P, C, K, H, S> matcher;
193            
194            public AbstractEventBus(
195                            Class<E> eventType, 
196                            S store, 
197                            InferencePolicy inferencePolicy,
198                            InferenceFilter<E,P,C,K,H,S> inferenceFilter,
199                            StatsCollector statsCollector,
200                            TimeUnit statsTimeUnit,
201                            Matcher<E,P,C,K,H,S> matcher) {
202                    this.store = store;
203                    this.eventType = eventType;
204                    this.inferencePolicy = inferencePolicy;
205                    this.statsCollector = statsCollector;
206                    if (statsCollector!=null) {
207                            stats = new EventBusStatsImpl();
208                            statsCollector.add(stats);
209                    }
210                    this.statsTimeUnit = statsTimeUnit;
211                    this.inferenceFilter = inferenceFilter;
212                    this.matcher = matcher;
213                    this.matcher.setEventBus(this);
214            }
215            
216            @Override
217            public Class<E> getEventType() {
218                    return eventType;
219            }
220            
221            public InferencePolicy getInferencePolicy() {
222                    return inferencePolicy;
223            }
224            
225            /** Abstract methods **/
226                    
227    
228            /**
229             * Instantiates master handle.
230             * @param event
231             * @return
232             */
233            protected abstract H newMasterHandle(PostCommand<E,P,C,K,H,S> postCommand);
234            
235            protected class CreateMasterHandleResult {
236                    
237                    private boolean isNew;
238                    private H handle;
239                    
240                    public boolean isNew() {
241                            return isNew;
242                    }
243                    public H getHandle() {
244                            return handle;
245                    }
246                    
247                    CreateMasterHandleResult(H handle, boolean isNew) {
248                            super();
249                            this.handle = handle;
250                            this.isNew = isNew;
251                    }
252                    
253                    
254            }
255            
256            
257            /**
258             * Returns "root" executor service to submit predicate evaluation and handler execution tasks to.
259             */
260            protected abstract TrackingExecutorService getExecutorService();
261            
262            /**
263             * Creates a wrapper around the master executor service for task tracking purposes.
264             * @param master Master executor service
265             * @return
266             */
267            protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name);
268                    
269            /**
270             * Create a wrapper around the root executor service for task tracking purposes.
271             * @return executor service or null for the synchronous mode.
272             */
273            protected abstract TrackingExecutorService createExecutorService(boolean oneOff, String name);  
274                    
275            /**
276             * Generates handler ID.
277             * @return
278             */
279            protected abstract K nextId();
280    
281            /**
282             * Helper interface for snapshot taking.
283             * @author Pavel Vlasov
284             *
285             * @param <K>
286             */
287            public interface DerivationEx<K> {
288                    K getHandlerId();
289                    List<K> getInputIds();
290            }
291                    
292            @SuppressWarnings("unchecked")
293            protected void workingMemorySnapshot(StateSnapshot<E, P, C, K, H, S> snapshot) {
294                    getStore().readLock().lock();
295                    try {
296                            for (H handle: getStore().getAll()) {
297                                    snapshot.event(handle.getId(), handle.getEvent(), handle.isDirectPost());
298                                    for (Derivation<E,P,C> d: handle.getDerivations()) {
299                                            snapshot.derivation(handle.getId(), ((DerivationEx<K>) d).getHandlerId(), ((DerivationEx<K>) d).getInputIds());
300                                    }
301                            }                       
302                    } finally {
303                            getStore().readLock().unlock();
304                    }
305            }
306    
307            // --- End abstract methods ---
308            
309            
310            public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) {
311                    try {
312                            snapshot.start();
313                            
314                            // Step 1 Working memory
315                            if (snapshot instanceof StateSnapshot) {
316                                    workingMemorySnapshot((StateSnapshot<E, P, C, K, H, S>) snapshot);
317                            }
318                            
319                            matcher.takeSnapshot(snapshot);
320                            
321                            snapshot.end(true);
322                    } catch (Exception e) {
323                            snapshot.end(false);
324                            throw new DispatchNetworkException("Error taking snapshot: "+e, e);
325                    }
326            }
327                    
328            private ExceptionHandler exceptionHandler;
329            
330            /* (non-Javadoc)
331             * @see com.hammurapi.eventbus.EventBus#setExceptionHandler(com.hammurapi.eventbus.ExceptionHandler)
332             */
333            public void setExceptionHandler(ExceptionHandler exceptionHandler) {
334                    this.exceptionHandler = exceptionHandler;
335            }
336            
337            /* (non-Javadoc)
338             * @see com.hammurapi.eventbus.EventBus#getExceptionHandler()
339             */
340            public ExceptionHandler getExceptionHandler() {
341                    return exceptionHandler;
342            }
343            
344            // Constructor which takes executor service and collection factory for joiner.
345            // Borrow joiner from 
346            
347            /* (non-Javadoc)
348             * @see com.hammurapi.eventbus.EventBus#add(com.hammurapi.eventbus.EventHandler, C, boolean, com.hammurapi.extract.Predicate)
349             */
350            @Override
351            public K addHandler(final EventHandler<E, P, C, H, S> eventHandler) {
352                    return matcher.addHandler(eventHandler);
353            }
354            
355    //      private JoinNode createJoinNode(PredicateNode pn,
356    //                      EventHandlerWrapper<E, P, C, K, H> handler) {
357    //              // TODO Auto-generated method stub
358    //              return null;
359    //      }
360                    
361            /* (non-Javadoc)
362             * @see com.hammurapi.eventbus.EventBus#remove(K)
363             */
364            public void removeHandlers(Iterable<K> keys) {
365                    matcher.removeHandlers(keys);
366            }
367            
368            @Override
369            public void removeHandlers(K... keys) {
370                    removeHandlers(Arrays.asList(keys));            
371            }
372            
373            protected abstract Lock getRtcLock();
374            
375            /* (non-Javadoc)
376             * @see com.hammurapi.eventbus.EventBus#post(E)
377             */
378            @Override
379            public H post(final E event, Predicate<E, S>... validators) {
380                    final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
381                    if (fineIsLoggable) {
382                            StringBuilder sb = new StringBuilder();
383                            sb.append("Client post, event: "+event);
384                            if (validators.length>0) {
385                                    sb.append(", validators: "+Arrays.toString(validators));
386                            }
387                            logger.fine(sb.toString());
388                    }
389                    
390                    if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) {
391                            getRtcLock().lock();
392                    }
393                    try {
394                            boolean exclusiveOrAfterRoot = InferencePolicy.AFTER_ROOT_EVENT.compareTo(inferencePolicy)<=0;
395                            PostCommand<E,P,C,K,H,S> postCommand = new PostCommand<E,P,C,K,H,S>(event, true, null, null, null, createInferenceContext(), validators);
396                            H ret = processInferenceCommand(postCommand);
397                            if (exclusiveOrAfterRoot) {
398                                    TrackingExecutorService hes = postCommand.getInferenceContext().getExecutorService();
399                                    if (hes!=null ) {
400                                            hes.join();
401                                    }
402                                    Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = postCommand.getInferenceContext().getInferenceCommandsQueue(); 
403                                    while (!inferenceCommandsQueue.isEmpty()) {
404                                            for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
405                                                    processInferenceCommand(conclusion);
406                                            }
407                                            if (hes!=null) {
408                                                    hes.join();
409                                            }
410                                    }
411                            }       
412                            return ret;
413                    } catch (Exception e) {
414                            logger.log(Level.SEVERE, "Post error: "+e, e);
415                            if (getExceptionHandler()!=null) {
416                                    getExceptionHandler().handleException(e);
417                            }
418                            return null;
419    //                      masterHandle.handleException(e);                        
420                    } finally {
421                            if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) {
422                                    getRtcLock().unlock();
423                            }                                                               
424                    }
425            }       
426            
427            /**
428             * @param command command to be processed
429             * @param executorService Executor service for asynchronous processing
430             * @param inferenceCommandsQueue collector of inference commands for further processing. 
431             */
432            @SuppressWarnings("unchecked")
433            protected H processInferenceCommand(InferenceCommand<E, P, C, K, H, S> command) {
434                    // Filter commands.
435                    if (inferenceFilter!=null && !inferenceFilter.accept(command, this)) {
436                            return null;
437                    }
438                    if (command instanceof PostCommand) {
439                            PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) command;
440                            if (stats!=null) {
441                                    if (postCommand.isDirectPost()) {
442                                            stats.posts.incrementAndGet();
443                                    } else {
444                                            stats.internalPosts.incrementAndGet();
445                                    }
446                            }
447                            
448                            try {
449                                    Handle<E,P,C,K> handle = postCommand.isHandleMode() ? postCommand.getHandle() : createMasterHandle(postCommand);
450                                    if (postCommand.isDirectPost()) {
451                                            postCommand.getInferenceContext().setRootHandle(handle);
452                                    }
453                                    Iterable<EventHandlerWrapper<E, P, C, K, H, S>> handlers = matcher.match(handle.getEvent(), postCommand.getInferenceContext().getExecutorService());
454                                    
455                                    Set<P> consumingPriorities = new HashSet<P>();
456                                    for (EventHandlerWrapper<E, P, C, K, H, S> ehw: handlers) {
457                                            if (ehw.consumes()) {
458                                                    consumingPriorities.add(ehw.getPriority());
459                                            }
460                                    }
461                                    P prevPriority = null;
462                                    final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
463                                    InferenceContext<E, P, C, K, H, S> inferenceContext = postCommand.getInferenceContext().createNext();
464                                    TrackingExecutorService es = inferenceContext.getExecutorService();
465                                    for (EventHandlerWrapper<E, P, C, K, H, S> handler: handlers) {                           
466                                            // Wait for higher priority handlers to finish.
467                                            if (es!=null && prevPriority!=null && consumingPriorities.contains(prevPriority) && !handler.getPriority().equals(prevPriority)) {
468                                                    es.join();
469                                                    if (!handle.isValid()) {
470                                                            if (fineIsLoggable) {
471                                                                    logger.fine("Event was consumed: "+handle.getEvent());
472                                                            }
473                                                                                                                    
474                                                            break; // Event has been consumed.
475                                                    }
476                                            }
477                                                    
478                                            if (fineIsLoggable) {
479                                                    logger.fine("Posting handler to execution: "+handler+" to process event "+handle.getEvent());
480                                            }
481                                            
482                                            if (stats!=null) {
483                                                    stats.handlersFired.incrementAndGet();
484                                            }
485                                            if (es==null) {                                         
486                                                    handler.post(null, inferenceContext, handle);                                           
487                                            } else {
488                                                    es.execute(new HandlerTask<E,P,C,K,H,S>(handle, handler, inferenceContext));
489                                            }
490                                            prevPriority = handler.getPriority();
491                                    }
492                                    
493                                    if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) {
494                                            if (es!=null) {
495                                                    es.join();
496                                            }
497                                            Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue();
498                                            for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
499                                                    processInferenceCommand(conclusion); 
500                                            }
501                                    }
502                                    return (H) handle;
503                            } catch (InterruptedException e) {
504                                    throw new EventDispatchException(e);
505                            }
506                    } 
507                    
508                    if (command instanceof RemoveCommand) {
509                            processRemoveCommand((RemoveCommand<E,P,C,K,H,S>) command);
510                            return null;
511                    }
512                    
513                    if (command instanceof RetractCommand) {
514                            final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
515                                            
516                            if (fineIsLoggable) {
517                                    logger.fine("Posting retract command to execution: "+command);
518                            }
519                            
520                            if (stats!=null) {
521                                    stats.handlersFired.incrementAndGet();
522                            }
523                            
524                            InferenceContext<E, P, C, K, H, S> inferenceContext = command.getInferenceContext().createNext();
525                            TrackingExecutorService es = inferenceContext.getExecutorService();
526                            
527                            if (es==null) {
528                                    RetractTask.processCommand((RetractCommand<E,P,C,K,H,S>) command, inferenceContext);                                              
529                            } else {
530                                    es.execute(new RetractTask<E,P,C,K,H,S>((RetractCommand<E,P,C,K,H,S>) command, inferenceContext));
531                            }
532                            
533                            if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) {
534                                    if (es!=null) {
535                                            try {
536                                                    es.join();
537                                            } catch (InterruptedException e) {
538                                                    throw new EventDispatchException(e);
539                                            }
540                                    }
541                                    Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue();
542                                    for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
543                                            processInferenceCommand(conclusion); 
544                                    }
545                            }                       
546                            
547                            return null;
548                    }
549                    
550                    throw new IllegalArgumentException("Unexpected command: "+command);
551            }
552            
553            // A trick to avoid exposure of local handle methods.
554            protected abstract void processRemoveCommand(RemoveCommand<E,P,C,K,H,S> command);
555            
556            /**
557             * Creates and returns new master handle. 
558             * re-dispatch existing event. 
559             * @param event Event to add to the store.
560             * @param joinDelegate
561             * @param directPost
562             * @param validators
563             * @param derivation 
564             * @return Master handle for new event, null for existing.
565             */
566            protected H createMasterHandle(PostCommand<E,P,C,K,H,S> postCommand) {
567                    getStore().writeLock().lock();
568                    try {
569                            H handle = null;
570                            if (getStore().getPrimaryKeyExtractor()==null) {
571                                    for (H eh: getStore()) {
572                                            if (eh.isValid() && postCommand.getEvent().equals(eh.getEvent())) {
573                                                    handle = eh;
574                                                    break;
575                                            }
576                                    }
577                            } else {
578                                    handle = getStore().getByPrimaryKey(postCommand.getEvent());
579                            }
580                            
581                            if (handle!=null && !postCommand.isDirectPost()) {                              
582                                    handle.addDerivation(postCommand.getHandlerId(), postCommand.getHandler(), postCommand.getInputs());
583                                    return handle;
584                            }
585                            
586                            handle = newMasterHandle(postCommand);
587                            Predicate<H, S>[] va = new Predicate[postCommand.getValidators().length];
588                            final Reference<E> eventReference = new WeakReference<E>(postCommand.getEvent()); 
589                            for (int i=0; i<postCommand.getValidators().length; ++i) {
590                                    final Predicate<E, S> validator = postCommand.getValidators()[i];
591                                    
592                                    va[i] = new Predicate<H, S>() {
593    
594                                            @Override
595                                            public Boolean extract(
596                                                            S context,
597                                                            Map<S, Map<Extractor<H, ? super Boolean, S>, ? super Boolean>> cache,
598                                                            H... obj) {
599                                                    E event = eventReference.get();
600                                                    if (event==null) {
601                                                            return false;
602                                                    }
603                                                    return validator.extract(context, null, Util.wrap(event));
604                                            }
605    
606                                            @Override
607                                            public Set<Integer> parameterIndices() {
608                                                    return validator.parameterIndices();
609                                            }
610    
611                                            @Override
612                                            public boolean isContextDependent() {
613                                                    return validator.isContextDependent();
614                                            }
615    
616                                            @Override
617                                            public ComparisonResult compareTo(Extractor<H, Boolean, S> other) {
618                                                    return ComparisonResult.NOT_EQUAL_NM; // Doesn't matter.
619                                            }
620    
621                                            @Override
622                                            public double getCost() {
623                                                    return validator.getCost();
624                                            }
625                                            
626                                    };
627                            }
628                            Store.Handle<H, E, S> sh = getStore().put(handle, va);
629                            handle.setStoreHandle(sh);
630                            return handle;
631                    } finally {
632                            getStore().writeLock().unlock();
633                    }
634            }       
635            
636            /* (non-Javadoc)
637             * @see com.hammurapi.eventbus.EventBus#reset()
638             */
639            public void reset() {
640                    getStore().clear();
641                    matcher.reset();
642            }
643            
644    //      public void rebuildDispatchNetwork() {
645    //              getBusLock().writeLock().lock();
646    //              try {
647    //                      rootNode.rebuild();
648    //              } finally {
649    //                      getBusLock().writeLock().unlock();
650    //              }               
651    //      }
652            
653            /**
654             * Store entry complements store handle to provide functionality of event handle. 
655             * @author Pavel Vlasov
656             *
657             */
658            public interface StoreEntry<E,P extends Comparable<P>,C,K> {
659                    
660                    /**
661                     * @return Derivations of this event.
662                     */
663                    Collection<Derivation<E,P,C>> getDerivations();
664                                    
665                    boolean isDerivedFrom(E event);
666                    
667                    /**
668                     * Sets new event. Invoked by substitute(). 
669                     * For internal use, shall not be invoked by client code.
670                     * @param event
671                     */
672                    void setEvent(E event);
673                                                                            
674                    /**
675                     * Invalidates this handle, creates a new handle
676                     * for updated event, posts updated event to the bus,
677                     * For internal use, shall not be invoked by client code.
678                     * @return New handle for updated event.
679                     */
680                    Handle<E,P,C,K> update();
681                    
682                    /**
683                     * @return Event id.
684                     */
685                    K getId();
686            }
687            
688            /**
689             * Event handle. 
690             * @author Pavel Vlasov
691             *
692             */
693            public interface Handle<E,P extends Comparable<P>,C,K> extends EventBus.Handle<E, P, C>, ExceptionHandler {
694                    
695    //              /**
696    //               * Sets new event. Invoked by substitute(). 
697    //               * For internal use, shall not be invoked by client code.
698    //               * @param event
699    //               */
700    //              void setEvent(E event);
701                                                            
702                    /**
703                     * For internal use, shall not be invoked by client code.
704                     * @return New handle for updated event.
705                     */
706                    void update();
707                    
708                    K getId();
709                    
710                    /**
711                     * Callback method. For internal use.
712                     * @param storeHandle
713                     */
714                    <H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E, P, C, H, S>> void setStoreHandle(Store.Handle<H, E, S> storeHandle);
715                    
716                    /**
717                     * For internal use.
718                     * @param derivation
719                     */
720                    void addDerivation(K handlerId, EventHandler<E, P, C, ?, ?> handler, Handle<E,P,C,K>[] inputs);
721                    
722                    /**
723                     * @return true if this handle's event was posted by client code.
724                     */
725                    boolean isDirectPost();
726                    
727                    /**
728                     * Sets direct post to true. For internal use.
729                     */
730                    void setDirectPost();
731            }
732            
733            /* (non-Javadoc)
734             * @see com.hammurapi.eventbus.EventBus#getDerivations(E)
735             */
736            public Collection<Derivation<E,P,C>> getDerivations(E event) {
737                    H handle = null;
738                    if (getStore().getPrimaryKeyExtractor()==null) {
739                            for (H sh: getStore()) {
740                                    if (event.equals(sh.getEvent())) {
741                                            handle = sh;
742                                            break;
743                                    }
744                            }
745                    } else {
746                            handle = getStore().getByPrimaryKey(event);
747                    }
748                    
749                    if (handle==null) {
750                            return Collections.emptyList();
751                    }
752                    return handle.getDerivations();
753            }
754    
755            @Override
756            public void join() throws InterruptedException {
757                    if (getExecutorService()!=null) {
758                            getExecutorService().join();
759                    }
760            }
761            
762            public boolean join(long timeout) throws InterruptedException {
763                    return getExecutorService().join(timeout);
764            }
765            
766            private S store;
767             
768            @Override
769            public S getStore() {
770                    return store;
771            }
772    
773            @Override
774            public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) {
775                    matcher.manageHandlers(handlerManager);
776            }
777    
778            /**
779             * Creates inference context with zero chain length and no root handle.
780             * Inference queue is supplied with AFTER_EVENT, AFTER_ROOT_EVENT and EXCLUSIVE policies.
781             * Use this method to create initial/root inference context.
782             * @return
783             */
784            protected abstract InferenceContext<E,P,C,K,H,S> createInferenceContext();
785            
786    }