001    package com.hammurapi.eventbus;
002    
003    import java.lang.reflect.Array;
004    import java.util.ArrayList;
005    import java.util.Arrays;
006    import java.util.Collection;
007    import java.util.Collections;
008    import java.util.Comparator;
009    import java.util.HashMap;
010    import java.util.HashSet;
011    import java.util.Iterator;
012    import java.util.LinkedList;
013    import java.util.List;
014    import java.util.ListIterator;
015    import java.util.Map;
016    import java.util.Set;
017    import java.util.TreeSet;
018    import java.util.concurrent.ExecutionException;
019    import java.util.concurrent.ExecutorService;
020    import java.util.concurrent.Future;
021    import java.util.concurrent.atomic.AtomicReference;
022    import java.util.concurrent.locks.ReadWriteLock;
023    import java.util.logging.Level;
024    import java.util.logging.Logger;
025    
026    import com.hammurapi.common.FreezeableCollection;
027    import com.hammurapi.common.Joiner;
028    import com.hammurapi.common.concurrent.TrackingExecutorService;
029    import com.hammurapi.eventbus.AbstractEventBus.Handle;
030    import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
031    import com.hammurapi.extract.And;
032    import com.hammurapi.extract.CommutativeAnd;
033    import com.hammurapi.extract.CommutativeOr;
034    import com.hammurapi.extract.CompositePredicate;
035    import com.hammurapi.extract.Extractor;
036    import com.hammurapi.extract.False;
037    import com.hammurapi.extract.MappedPredicate;
038    import com.hammurapi.extract.Predicate;
039    import com.hammurapi.extract.True;
040    
041    public abstract class PredicateChainingMatcher<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements Matcher<E, P, C, K, H, S> {
042            private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName());
043    
044            /**
045             * Creates a wrapper around the master executor service for task tracking purposes.
046             * @param master Master executor service
047             * @return
048             */
049            protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name);
050            
051            @Override
052            public Iterable<EventHandlerWrapper<E, P, C, K, H, S>> match(E event, ExecutorService executorService) {
053                    final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
054                    if (fineIsLoggable) {
055                            logger.fine("Collecting handlers to fire for event "+event);
056                    }
057    
058                    List<EventHandlerWrapper<E, P, C, K, H, S>> handlers = new ArrayList<EventHandlerWrapper<E,P,C,K,H,S>>();
059                    Map<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>> cache = new HashMap<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>>();
060                    
061                    getLock().readLock().lock();
062                    try {
063                            TrackingExecutorService ces = createExecutorService(executorService, true, "Handler collector");
064                            if (ces == null) {
065                                    rootNode.collectHandlers(cache, handlers, event);
066                            } else {
067                                    // Reference for robustness - unset when done. 
068                                    FreezeableCollection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>> collector = new FreezeableCollection<Future<Collection<EventHandlerWrapper<E,P,C,K,H,S>>>>(Collections.synchronizedCollection(new ArrayList<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>()));
069                                    AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>> collectorRef = new AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>>(Collections.synchronizedCollection(collector)); 
070                                    rootNode.collectHandlers(cache, ces, collectorRef, event);
071                                    try {
072                                            ces.join();
073                                    } catch (InterruptedException ie) {
074                                            throw new EventDispatchException(ie);
075                                    }
076    
077                                    collector.freeze();
078                                    for (Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>> future: collectorRef.get()) { // To catch errors when handlers are added after join()
079                                            try {
080                                                    handlers.addAll(future.get());
081                                            } catch (ExecutionException e) {
082                                                    throw new EventDispatchException("Problem collecting handlers to fire: "+e, e);
083                                            } catch (InterruptedException e) {
084                                                    throw new EventDispatchException("Collecting handlers to fire has been interrupted: "+e, e);
085                                            }
086                                    }
087                            }
088                    } finally {
089                            getLock().readLock().unlock();
090                    }
091                    
092                    // Sort handlers before execution.
093                    Collections.sort(handlers, new Comparator<EventHandlerWrapper<E, P, C, K, H, S>>() {
094    
095                            @Override
096                            public int compare(EventHandlerWrapper<E, P, C, K, H, S> o1, EventHandlerWrapper<E, P, C, K, H, S> o2) {                            
097                                    P p1 = o1.getPriority();
098                                    P p2 = o2.getPriority();
099                                    if (p2==null) {
100                                            return p1==null ? o2.hashCode() - o1.hashCode() : 1; 
101                                    }
102                                    if (p1==null) {
103                                            return -1;
104                                    }
105                                    return p2.compareTo(p1);
106                            }
107                            
108                    });
109                    
110                    if (fineIsLoggable) {
111                            logger.fine("Collected "+handlers.size()+" handlers to fire: "+handlers);
112                    }
113                    
114                    return handlers;
115            }
116    
117            @Override
118            public K addHandler(EventHandler<E, P, C, H, S> eventHandler) {
119                    if (eventHandler.getCardinality()<1) {
120                            throw new DispatchNetworkException("Handler cardinality cannot be less than one.");
121                    }
122                    getLock().writeLock().lock();
123                    try {
124                            final K ret = nextId();                                         
125                            addInternal(wrap(eventHandler, ret));                   
126                            return ret;
127                    } finally {
128                            getLock().writeLock().unlock();
129                    }
130            }
131            
132            /**
133             * Lock for the inference network.
134             * @return
135             */
136            protected abstract ReadWriteLock getLock();
137    
138            @Override
139            public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) {
140                    getLock().writeLock().lock(); // Locking the bus for writing and as such for reading.
141                    try {
142                            // Step 2 Dispatch network & join collections.
143                            rootNode.takeSnapshot(snapshot, new HashSet<K>());
144                    } finally {
145                            getLock().writeLock().unlock(); // Done!
146                    }
147            }
148            
149            private void addInternal(EventHandlerWrapper<E, P, C, K, H, S> wrappedHandler) {
150                    if (wrappedHandler.getPredicate() instanceof CommutativeOr) {
151                            for (Predicate<E, C> part: ((CommutativeOr<E, C>) wrappedHandler.getPredicate()).getParts()) {
152                                    addInternal(new EventHandlerWrapperFilter<E, P, C, K, H, S>(wrappedHandler, part));
153                            }
154                    } else {
155                            // Don't bother if always false
156                            if (!False.getInstance().equals(wrappedHandler.getPredicate())) {
157                                    if (wrappedHandler.getCardinality()==1) {
158                                            if (!rootNode.addHandler(wrappedHandler, null)) {
159                                                    throw new DispatchNetworkException("Predicate "+wrappedHandler.getPredicate()+" cannot be added to the root node, handler "+wrappedHandler.getHandler());
160                                            }
161                                    } else {
162                                            for (JoinInput ji: buildJoinNetwork(wrappedHandler)) {                                          
163                                                    if (!rootNode.addHandler(ji.handler, null)) {
164                                                            throw new DispatchNetworkException("Predicate "+ji.predicate+" cannot be added to the root node, for join handler");                                                    
165                                                    }
166                                            }
167                                    }
168                            }
169                    }
170            }
171            
172            protected class JoinInput {
173                    
174                    K id;
175                    
176                    public JoinInput(int index) {
177                            this.index = index;
178                            this.id = nextId();
179                            this.map = new int[] {index};
180                    }
181                    
182                    EventHandlerWrapper<E, P, C, K, H, S> handler;
183                    
184                    Predicate<E, C> predicate;
185    
186                    JoinNode joinNode;
187                    
188                    final int index;
189                    
190                    final int[] map;
191                    
192            }
193            
194            //      private JoinNode createJoinNode(PredicateNode pn,
195            //                      EventHandlerWrapper<E, P, C, K, H> handler) {
196            //              // TODO Auto-generated method stub
197            //              return null;
198            //      }
199                            
200                    protected class JoinInputEventHandler implements EventHandlerWrapper<E, P, C, K, H, S> {
201                            
202                            private EventBusJoiner joiner;
203                            private int idx;
204                            final K id = nextId();
205                            private EventHandlerWrapper<E, P, C, K, H, S> handler;
206                            private JoinNode joinNode;
207                            private Predicate<E, C> predicate;
208                            private int cardinality;
209            
210                            JoinInputEventHandler(
211                                            JoinNode joinNode, 
212                                            int idx,                                
213                                            EventHandlerWrapper<E, P, C, K, H, S> handler,
214                                            Predicate<E, C> predicate,
215                                            int cardinality) {
216                                    
217                                    this.joiner = joinNode.joiner;
218                                    this.joinNode = joinNode;
219                                    this.idx = idx;
220                                    this.handler = handler;
221                                    this.joinNode = joinNode;       
222                                    this.predicate = predicate;
223                                    this.cardinality = cardinality;
224                            }
225                            
226                            public K getId() {
227                                    return id;
228                            }
229            
230                            @Override
231                            public boolean consumes() {
232                                    return handler.consumes();
233                            }
234            
235                            @Override
236                            public int getCardinality() {
237                                    return cardinality;
238                            }
239            
240                            @Override
241                            public C getContext() {
242                                    return handler.getContext();
243                            }
244            
245                            @Override
246                            public P getPriority() {
247                                    return handler.getPriority();
248                            }
249                            
250                            @Override
251                            public void post(
252                                            EventDispatchContext<E, P, C, H, S> context, 
253                                            InferenceContext<E,P,C,K,H,S> inferenceContext, 
254                                            Handle<E,P,C,K>... handles) {                     
255                                    try {
256                                            
257                                            if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
258    //                                              if (inferenceContext.getInferenceCommandsQueue()!=null) {
259    //                                                      throw new IllegalArgumentException("Inference commands queue shall be null");
260    //                                              }
261                                                    inferenceContext = inferenceContext.wrap();
262                                            }
263                                                                                    
264                                            joiner.addInput(idx, handles, new CompositeContext<E, P, C, K, H, S>(
265                                                            context, 
266                                                            handler.getContext(), 
267                                                            inferenceContext));     
268                                            
269                                    } catch (Exception e) {
270                                            logger.log(Level.SEVERE, "Join problem: "+e, e);
271                                            if (bus.getExceptionHandler()!=null) {
272                                                    bus.getExceptionHandler().handleException(e);
273                                            }
274                                            if (inferenceContext.getRootHandle()!=null) {
275                                                    inferenceContext.getRootHandle().handleException(e);
276                                            }
277                                    } finally {
278                                            if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
279                                                    inferenceContext.processInferenceCommands();
280                                            }                                                       
281                                    }
282                            }
283            
284                            @Override
285                            public void reset() {
286                                    joiner.reset();
287                                    handler.reset();
288                            }
289            
290                            @Override
291                            public EventHandler<E, P, C, H, S> getHandler() {
292                                    throw new UnsupportedOperationException();
293                            }
294            
295                            @Override
296                            public Set<K> getRegistrationKeys() {
297                                    return handler.getRegistrationKeys();
298                            }
299            
300                            @Override
301                            public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) {
302                                    if (taken.add(id)) {
303                                            snapshot.joinInput(id, joinNode.id, idx);
304                                            joinNode.takeSnapshot(snapshot, taken);
305                                    }
306                            }
307            
308                            @Override
309                            public Predicate<E, C> getPredicate() {
310                                    return predicate;
311                            }
312            
313                            @Override
314                            public String toString() {
315                                    return "JoinInputEventHandler [joiner=" + joiner + 
316                                            ", idx=" + idx + 
317                                            ", id=" + id + 
318                                            ", handler=" + handler + 
319                                            ", joinNode=" + joinNode + 
320                                            ", predicate=" + predicate + "]";
321                            }
322            
323                            @Override
324                            public boolean isOneOff() {
325                                    return handler==null ? joinNode.isOneOff() : handler.isOneOff();
326                            }
327            
328                            @Override
329                            public Mode getMode() {
330                                    return handler==null ? joinNode.getMode() : handler.getMode();
331                            }
332                                                            
333                    }
334    
335            /**
336                     * Joins several inputs, applies predicate(s) 
337                     * @author Pavel Vlasov
338                     *
339                     */
340                    protected class JoinNode {
341                            final Set<Integer> outputIndices;
342                            Predicate<E, C> predicate;
343                            EventBusJoiner joiner;
344                            EventHandlerWrapper<E, P, C, K, H, S> finalEventHandler;
345                            int finalCardinality;
346                            
347                            public JoinNode(int finalCardinality, PredicateNode pn) {
348                                    if (pn.userObject!=null) {
349                                            throw new EventBusException("User object is not null!");
350                                    }
351                                    this.outputIndices = pn.indexes;
352                                    this.predicate = pn.predicate;
353                                    pn.userObject = this;
354                                    this.finalCardinality = finalCardinality;
355                                    this.id = nextId();
356                            }
357                            
358                            JoinNode nxt;
359            //              List<JoinNode> inputNodes = new ArrayList<JoinNode>();
360            //              
361            //              List<JoinInput> joinInputs = new ArrayList<JoinInput>(); 
362                            
363                            // Maps input indexes to output
364                            Mapper<Handle<E,P,C,K>>[] outputMappers;
365                            
366                            // Maps input indexes to predicate
367                            Mapper<Handle<E,P,C,K>>[] predicateMappers;
368                            
369                            K id;
370                            
371                            void setNext(JoinNode next) {
372                                    nxt = next;
373            //                      nxt.inputNodes.add(this);
374                            }
375            
376                            void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) {
377                                    if (taken.add(id)) {
378                                            if (finalEventHandler!=null) {
379                                                    for (K hid: finalEventHandler.getRegistrationKeys()) {
380                                                            if (taken.add(hid)) {
381                                                                    snapshot.handler(hid, finalEventHandler.getHandler());
382                                                            }
383                                                    }
384                                            }
385                                            K hid = finalEventHandler==null ? null : finalEventHandler.getRegistrationKeys().iterator().next();
386                                            if (nxt!=null) {
387                                                    nxt.takeSnapshot(snapshot, taken);
388                                            }
389                                            snapshot.joinNode(id, predicate, outputIndices, hid, nxt==null ? null : nxt.id);
390                                            joiner.takeSnapshot(id, snapshot, taken);                               
391                                    }
392                            }
393            
394                            public boolean isOneOff() {
395                                    return nxt==null ? finalEventHandler.isOneOff() : nxt.isOneOff();
396                            }
397            
398                            public EventHandlerBase.Mode getMode() {
399                                    return nxt==null ? finalEventHandler.getMode() : nxt.getMode();
400                            }
401                    }
402    
403            /**
404             * Helper class for building join network.
405             * @author Pavel Vlasov
406             *
407             */
408            private class PredicateNode {
409                    
410                    int position;
411                    
412                    PredicateNode(Set<Integer> indexes) {
413                            this.indexes = indexes;
414                    }
415                    
416                    PredicateNode(int index) {
417                            indexes = Collections.singleton(index);
418                    }
419                    
420                    Set<Integer> indexes;
421                    Predicate<E,C> predicate;
422                    List<PredicateNode> inputs = new ArrayList<PredicateNode>();
423                    Object userObject;
424            }
425    
426            /**
427             * Root inference node with True predicate. Shall be initialized by subclass.
428             */
429            @SuppressWarnings("unchecked")
430            protected PredicatedInferenceNode<E, P, C, K, H, S> rootNode;             
431    
432            /**
433             * Constructs join network for multi-event handlers. 
434             * @param handler Handler
435             * @param predicate Predicate
436             * @return Collection of input handlers with cardinality 1.
437             * @throws CloneNotSupportedException 
438             */
439            @SuppressWarnings("unchecked")
440            private Collection<JoinInput> buildJoinNetwork(final EventHandlerWrapper<E, P, C, K, H, S> handler) {
441    
442                    Predicate<E,C>[] pa = new Predicate[] {handler.getPredicate()};           
443                    // Stupid, but Eclipse doesn't allow to create an array of JoinInput using new JoinInput[].
444                    JoinInput[] ret = (JoinInput[]) Array.newInstance(JoinInput.class, handler.getCardinality());                                           
445                    PredicateNode[] pna = (PredicateNode[]) Array.newInstance(PredicateNode.class, handler.getCardinality());
446                    // Initialization.
447                    for (int i=0; i<pna.length; ++i) {
448                            pna[i]=new PredicateNode(i);
449                            pna[i].predicate = extractIndexPath(Collections.singleton(i), pa);
450                            pna[i].position = i;
451    
452                            // Initialization
453                            ret[i] = new JoinInput(i);
454                            ret[i].predicate = pna[i].predicate;
455                            pna[i].userObject = ret[i];
456                    }
457                    
458                    while (size(pna)>1 && pa[0]!=null) {
459                            List<PredicateNode> collector = new ArrayList<PredicateNode>();
460                            permutate(pna, 0, collector);
461                            Collections.sort(collector, INDEX_SET_COMPARATOR);
462                            for (PredicateNode candidate: collector) {
463                                    candidate.predicate = extractIndexPath(candidate.indexes, pa);
464                                    if (candidate.predicate!=null) {
465                                            candidate.position = candidate.inputs.get(0).position;
466                                            for (PredicateNode input: candidate.inputs) {
467                                                    pna[input.position]=null;
468                                            }
469                                            pna[candidate.position]=candidate;
470                                            break;
471                                    }
472                            }
473                    }
474                    
475                    if (size(pna)>1) {
476                            // Create final predicate-less predicate node
477                            Set<Integer> allIndexes = new TreeSet<Integer>();
478                            for (int i=0; i<handler.getCardinality(); ++i) {
479                                    allIndexes.add(i);
480                            }
481                            PredicateNode finalPredicateNode = new PredicateNode(allIndexes);
482                            for (int i=0; i<pna.length; ++i) {
483                                    if (pna[i]!=null) {
484                                            finalPredicateNode.inputs.add(pna[i]);
485                                            pna[i]=null;
486                                    }
487                            }
488                            pna[0] = finalPredicateNode;
489                    }
490                    
491                    if (pa[0]!=null && !(pa[0] instanceof True)) {
492                            // Index-less predicate, add to final predicate node.
493                            if (pna[0].predicate == null) {
494                                    pna[0].predicate = pa[0];
495                            } else if (pna[0].predicate instanceof And) {
496                                    pna[0].predicate = ((And<E,C>) pna[0].predicate).add(pa[0]);
497                            } else if (pna[0].predicate instanceof CommutativeAnd) {
498                                    pna[0].predicate = ((CommutativeAnd<E,C>) pna[0].predicate).add(pa[0]);
499                            } else {
500                                    pna[0].predicate = new And<E,C>(0, null, pna[0].predicate, pa[0]);
501                            }
502                    }               
503                    
504                    for (PredicateNode pn: pna) {
505                            if (pn!=null) { // There should be only one such node.
506                                    wire(pn, handler, handler.getCardinality()).finalEventHandler=handler;
507                                    break;
508                            }
509                    }               
510                                                    
511                    return Arrays.asList(ret);
512            }
513            
514            /**
515             * Extracts path for specified indexes from the predicate, replaces predicate if extraction was successful.
516             * @param idx Index.
517             * @param pa Single element array with source predicate. If extraction is successful then source predicate gets replaced with 
518             * predicate without index parts.
519             * @param indexPath Index path to accumulate result.
520             * @return predicate for the specified index or null if extraction wasn't successful.
521             */
522            private Predicate<E,C> extractIndexPath(Set<Integer> indexes, Predicate<E, C>[] pa) {         
523                    
524                    if (pa[0] instanceof And) {
525                            List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>();
526                            List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((And<E,C>) pa[0]).getParts());
527                            double currentCost = 0;
528                            int ahead = 0;
529                            ListIterator<Predicate<E, C>> pit = parts.listIterator();
530                            Z: while (pit.hasNext()) {
531                                    Predicate<E, C> part = pit.next();
532                                    if (ahead==0) {
533                                            currentCost=part.getCost();
534                                    }
535                                    ++ahead;
536                                                                    
537                                    Set<Integer> parameterIndices = part.parameterIndices();
538                                    if (indexes.containsAll(parameterIndices)) {
539                                            // Extract only if all lower cost predicates before were extracted.
540                                            if (currentCost<part.getCost()) {
541                                                    continue;
542                                            }
543                                            // Exact matching index predicate - extract
544                                            indexPath.add(part);
545                                            pit.remove();
546                                            --ahead;
547                                    } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) { 
548                                            // Multi-index predicate with matching index. Try to extract. Break if not possible.
549                                            @SuppressWarnings("unchecked")
550                                            Predicate<E, C>[] spa = new Predicate[] {part};
551                                            Predicate<E, C> subPart = extractIndexPath(indexes, spa);
552                                            if (subPart==null) {
553                                                    break;
554                                            } else {
555                                                    indexPath.add(subPart);
556                                                    if (spa[0]==null) {
557                                                            pit.remove();
558                                                            --ahead;
559                                                    } else {
560                                                            pit.set(spa[0]);
561                                                            currentCost=spa[0].getCost();
562                                                    }
563                                                    for (Integer idx: indexes) {
564                                                            if (spa[0].parameterIndices().contains(idx)) {
565                                                                    // Still has the index - break.
566                                                                    break Z;
567                                                            }
568                                                    }
569                                            }
570                                    }
571                            }                       
572                            
573                            if (indexPath.isEmpty()) {
574                                    return null;
575                            }
576                            
577                            switch (parts.size()) {
578                            case 0:
579                                    pa[0] = null;
580                                    break;
581                            case 1: 
582                                    pa[0] = parts.get(0);
583                                    break;
584                            default:
585                                    pa[0] = new And<E,C>(0, null, parts);
586                                    break;
587                            }
588                            
589                            return indexPath.size()==1 ? indexPath.get(0) : new And<E,C>(0, null, indexPath);                 
590                    } else if (pa[0] instanceof CommutativeAnd) {
591                            List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>();
592                            List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((CommutativeAnd<E,C>) pa[0]).getParts());
593                            double currentCost = 0;
594                            int ahead=0;
595                            ListIterator<Predicate<E, C>> pit = parts.listIterator();
596                            while (pit.hasNext()) {
597                                    Predicate<E, C> part = pit.next();                                
598                                    if (ahead==0) {
599                                            currentCost = part.getCost();
600                                    }
601                                    ++ahead;
602                                    
603                                    Set<Integer> parameterIndices = part.parameterIndices();
604                                    if (indexes.containsAll(parameterIndices)) { 
605                                            // Extract only if all lower cost predicates before were extracted.
606                                            if (currentCost<part.getCost()) {
607                                                    continue;
608                                            }
609                                            // Single matching index predicate - extract
610                                            indexPath.add(part);
611                                            pit.remove();
612                                            --ahead;
613                                    } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) { 
614                                            // Multi-index predicate with matching index. Try to extract. 
615                                            @SuppressWarnings("unchecked")
616                                            Predicate<E, C>[] spa = new Predicate[] {part};
617                                            Predicate<E, C> subPart = extractIndexPath(indexes, spa);
618                                            if (subPart!=null) {
619                                                    indexPath.add(subPart);
620                                                    if (spa[0]==null) {
621                                                            pit.remove();
622                                                            --ahead;
623                                                    } else {
624                                                            pit.set(spa[0]);
625                                                            currentCost = spa[0].getCost();
626                                                    }
627                                            }
628                                    }
629                            }                       
630                            
631                            if (indexPath.isEmpty()) {
632                                    return null;
633                            }
634                            
635                            switch (parts.size()) {
636                            case 0:
637                                    pa[0] = null;
638                                    break;
639                            case 1: 
640                                    pa[0] = parts.get(0);
641                                    break;
642                            default:
643                                    pa[0] = new CommutativeAnd<E,C>(0, null, parts);
644                                    break;
645                            }
646                            
647                            return indexPath.size()==1 ? indexPath.get(0) : new CommutativeAnd<E,C>(0, null, indexPath);                      
648                    }
649                    
650                    if (indexes.containsAll(pa[0].parameterIndices())) {
651                            Predicate<E, C> ret = pa[0];
652                            pa[0] = null;
653                            return ret;
654                    }
655                    
656                    return null;
657            }
658    
659            /**
660             * Creates joiner which outputs to given event handler.
661             * @param jn
662             */
663            @SuppressWarnings("unchecked")
664            private JoinNode wire(PredicateNode pn, EventHandlerWrapper<E, P, C, K, H, S> handler, int finalCardinality) {
665                    JoinNode ret = new JoinNode(finalCardinality, pn);              
666                    
667                    // Output mappers
668                    ret.outputMappers = new Mapper[pn.inputs.size()];
669                    int[][] jind = new int[ret.outputMappers.length][];
670                    List<Integer> outputIndices = new ArrayList<Integer>(ret.outputIndices);
671                    Collections.sort(outputIndices);
672                    int idx = 0;
673                    for (PredicateNode ipn: pn.inputs) {
674                            if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
675                                    JoinInput ji = (JoinInput) ipn.userObject;
676                                    int jii = outputIndices.indexOf(ji.index);
677                                    if (jii==-1) {
678                                            throw new IllegalStateException("Join input index "+ji.index+" is not present in output indices "+outputIndices);
679                                    }
680                                    jind[idx] = new int[] {ji.index};
681                                    ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {jii});
682                            } else {
683                                    List<Integer> ioi = new ArrayList<Integer>(ipn.indexes);
684                                    Collections.sort(ioi);
685                                    int[] map = new int[ioi.size()];
686                                    Iterator<Integer> ioiit = ioi.iterator();
687                                    for (int i=0; ioiit.hasNext(); ++i) {
688                                            int oi = ioiit.next();
689                                            int jii = outputIndices.indexOf(oi);
690                                            if (jii==-1) {
691                                                    throw new IllegalStateException("Input node index "+oi+" at offset "+i+" is not present in output indices "+outputIndices);
692                                            }
693                                            map[i]=jii;
694                                    }
695                                    jind[idx] = new int[ioi.size()];
696                                    for (int i=0; i<jind.length; ++i) {
697                                            jind[idx][i] = ioi.get(i);
698                                    }
699                                    ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map);
700                            }
701                    }
702                    
703                    MappingEventHandler<E, P, C, K, H, S> meh = new MappingEventHandler<E, P, C, K, H, S>(handler, ret.outputMappers);
704                    ret.joiner = createJoiner(meh, jind);
705                                    
706                    // Predicate mappers.
707                    ret.predicateMappers = new Mapper[pn.inputs.size()];
708                    idx = 0;
709                    for (PredicateNode ipn: pn.inputs) {
710                            if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
711                                    JoinInput ji = (JoinInput) ipn.userObject;
712                                    ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {ji.index});
713                            } else {
714                                    List<Integer> ioi = new ArrayList<Integer>(ipn.indexes);
715                                    Collections.sort(ioi);
716                                    int [] map = new int[ioi.size()];
717                                    Iterator<Integer> ioiit = ioi.iterator();
718                                    for (int i=0; ioiit.hasNext(); ++i) {
719                                            map[i]=ioiit.next();
720                                    }
721                                    ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map);
722                            }
723                    }
724                    
725                    if (ret.predicate instanceof And || ret.predicate instanceof CommutativeAnd) {
726                            for (Predicate<E, C> p: ((CompositePredicate<E,?,C,?>) ret.predicate).getParts()) {
727                                    ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(p, ret.predicateMappers, bus.getEventType()));                                
728                            }
729                    } else if (ret.predicate!=null) {
730                            ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(ret.predicate, ret.predicateMappers, bus.getEventType()));
731                    }
732                    
733                    for (int i=0, l=pn.inputs.size(); i<l; ++i) {                        
734                            PredicateNode ipn = pn.inputs.get(i);
735                            if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
736                                    JoinInput joinInput = (JoinInput) ipn.userObject;
737                                    joinInput.joinNode = ret;
738                                    Predicate<E, C> jp;
739                                    if (joinInput.predicate==null) {
740                                            jp = True.getInstance(); 
741                                    } else {
742                                            jp = MappedPredicate.mapPredicate(joinInput.predicate, joinInput.map);                                                  
743                                    }
744            
745                                    joinInput.handler = new JoinInputEventHandler(ret, i, handler, jp, 1);
746                            } else {
747                                    wire(ipn, new JoinInputEventHandler(ret, i, handler, null, ipn.inputs.size()), finalCardinality).nxt=ret;
748                            }
749                    }
750                    return ret;
751            }
752    
753            @Override
754            public void removeHandlers(Iterable<K> keys) {
755                    getLock().writeLock().lock();
756                    try {
757                            rootNode.remove(null, null, true, keys);
758                    } finally {
759                            getLock().writeLock().unlock();
760                    }               
761            }
762                    
763            private void permutate(PredicateNode[] pna, int offset, List<PredicateNode> collector) {
764            if (offset==pna.length) {
765                    Set<Integer> set = new TreeSet<Integer>();
766                    int joinCount = 0;
767                    for (int i=0; i<pna.length; ++i) {
768                        if (pna[i]!=null) {
769                            set.addAll(pna[i].indexes);
770                            ++joinCount;
771                        }
772                    }
773                    if (set.size()>1 && joinCount>1) {
774                            PredicateNode pn = new PredicateNode(set);
775                            for (PredicateNode input: pna) {
776                                    if (input!=null) {
777                                            pn.inputs.add(input);
778                                    }
779                            }
780                        collector.add(pn);
781                    }
782            } else {
783                PredicateNode pn = pna[offset];
784                permutate(pna, offset+1, collector);
785                if (pn!=null) {
786                    pna[offset] = null;
787                    permutate(pna, offset+1, collector);
788                    pna[offset] = pn;
789                }
790            }
791        }
792            
793        private final Comparator<PredicateNode> INDEX_SET_COMPARATOR = new Comparator<PredicateNode>() {
794    
795            @Override
796            public int compare(PredicateNode p0, PredicateNode p1) {                
797                    double cost0 = p0.predicate==null ? 0 : p0.predicate.getCost();
798                            double cost1 = p1.predicate==null ? 0 : p1.predicate.getCost();
799                            double costDelta = cost0 - cost1;
800                    if (costDelta<0) {
801                            return -1;
802                    }
803                    if (costDelta>0) {
804                            return 1;
805                    }
806                int sizeDelta = p0.indexes.size() - p1.indexes.size();
807                if (sizeDelta!=0) {
808                        return sizeDelta;
809                }
810    
811                for (int i: p0.indexes) {
812                        for (int j: p1.indexes) {
813                                if (i!=j) {
814                                        return i-j;
815                                }
816                        }
817                }
818    
819                return p0.hashCode() - p1.hashCode();
820            }
821    
822        };
823            private EventBus<E, P, C, K, H, S> bus;
824    
825        private int size(PredicateNode[] pna) {
826            int ret=0;
827            for (PredicateNode pn: pna) {
828                    if (pn!=null) {
829                            ++ret;
830                    }
831            }
832            return ret;
833        }
834            
835            public void reset() {
836                    getLock().writeLock().lock();
837                    try {
838                            rootNode.reset();
839                    } finally {
840                            getLock().writeLock().unlock();
841                    }
842            }
843            
844            @Override
845            public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) {
846                    getLock().writeLock().lock();
847                    try {
848                            handlerManager.manageHandlers(this);
849                    } finally {
850                            getLock().writeLock().unlock();
851                    }
852            }
853            
854            protected abstract PredicatedInferenceNode<E, P, C, K, H, S> createPredicatedInferenceNode(PredicatedInferenceNode<E, P, C, K, H, S> parent, Predicate<E, C> predicate, C context);
855            
856            protected abstract Set<K> extractHandlerIds(EventHandlerWrapper<E, P, C, K, H, S> handler);
857    
858            /**
859             * Generates handler ID.
860             * @return
861             */
862            protected abstract K nextId();
863            
864            /**
865             * Factory method for handler wrapper.
866             * @param eventHandler
867             * @param oneOff
868             * @return
869             */
870            protected abstract EventHandlerWrapper<E, P, C, K, H, S> wrap(EventHandler<E, P, C, H, S> eventHandler,     K registrationKey);
871            
872            protected abstract class EventBusJoiner extends Joiner<Handle<E,P,C,K>[], CompositeContext<E, P, C, K, H, S>, Object> {
873    
874                    
875                    protected EventBusJoiner(
876                                    Collector<Handle<E,P,C,K>[]>[] inputCollectors,
877                                    Class<Handle<E,P,C,K>[]> inputType, 
878                                    boolean outerJoin) {
879                            
880                            super(inputCollectors, inputType, outerJoin);
881                    }
882    
883                    protected abstract void takeSnapshot(K joinNodeId, Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken);
884                    
885            }
886            
887            protected abstract EventBusJoiner createJoiner(JoinEventHandler<E, P, C, K, H, S> handler, int[][] indices);
888    
889            @Override
890            public void setEventBus(EventBus<E, P, C, K, H, S> bus) {
891                    this.bus = bus;         
892            }
893            
894            public EventBus<E, P, C, K, H, S> getEventBus() {
895                    return bus;
896            }
897    }