EMMA Coverage Report (generated Thu Jan 20 11:39:44 EST 2011)
[all classes][com.hammurapi.eventbus]

COVERAGE SUMMARY FOR SOURCE FILE [PredicateChainingMatcher.java]

nameclass, %method, %block, %line, %
PredicateChainingMatcher.java100% (8/8)74%  (34/46)76%  (1642/2157)79%  (316.9/403)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class PredicateChainingMatcher$JoinInputEventHandler100% (1/1)60%  (9/15)47%  (117/250)55%  (27.6/50)
getHandler (): EventHandler 0%   (0/1)0%   (0/4)0%   (0/1)
getMode (): EventHandlerBase$Mode 0%   (0/1)0%   (0/11)0%   (0/1)
getRegistrationKeys (): Set 0%   (0/1)0%   (0/4)0%   (0/1)
isOneOff (): boolean 0%   (0/1)0%   (0/11)0%   (0/1)
reset (): void 0%   (0/1)0%   (0/7)0%   (0/3)
toString (): String 0%   (0/1)0%   (0/36)0%   (0/6)
post (EventDispatchContext, InferenceContext, AbstractEventBus$Handle []): void 100% (1/1)42%  (44/104)45%  (7.6/17)
PredicateChainingMatcher$JoinInputEventHandler (PredicateChainingMatcher, Pre... 100% (1/1)100% (32/32)100% (10/10)
consumes (): boolean 100% (1/1)100% (4/4)100% (1/1)
getCardinality (): int 100% (1/1)100% (3/3)100% (1/1)
getContext (): Object 100% (1/1)100% (4/4)100% (1/1)
getId (): Object 100% (1/1)100% (3/3)100% (1/1)
getPredicate (): Predicate 100% (1/1)100% (3/3)100% (1/1)
getPriority (): Comparable 100% (1/1)100% (4/4)100% (1/1)
takeSnapshot (AbstractEventBus$Snapshot, Set): void 100% (1/1)100% (20/20)100% (4/4)
     
class PredicateChainingMatcher$2100% (1/1)100% (2/2)62%  (20/32)78%  (7/9)
compare (EventHandlerWrapper, EventHandlerWrapper): int 100% (1/1)54%  (14/26)71%  (5/7)
PredicateChainingMatcher$2 (PredicateChainingMatcher): void 100% (1/1)100% (6/6)100% (2/2)
     
class PredicateChainingMatcher$JoinNode100% (1/1)40%  (2/5)77%  (102/133)79%  (19/24)
getMode (): EventHandlerBase$Mode 0%   (0/1)0%   (0/11)0%   (0/1)
isOneOff (): boolean 0%   (0/1)0%   (0/11)0%   (0/1)
setNext (PredicateChainingMatcher$JoinNode): void 0%   (0/1)0%   (0/4)0%   (0/2)
PredicateChainingMatcher$JoinNode (PredicateChainingMatcher, int, PredicateCh... 100% (1/1)84%  (27/32)89%  (8/9)
takeSnapshot (AbstractEventBus$Snapshot, Set): void 100% (1/1)100% (75/75)100% (11/11)
     
class PredicateChainingMatcher100% (1/1)83%  (15/18)80%  (1268/1591)82%  (238.7/292)
access$0 (): Logger 0%   (0/1)0%   (0/2)0%   (0/1)
access$1 (PredicateChainingMatcher): EventBus 0%   (0/1)0%   (0/3)0%   (0/1)
manageHandlers (Matcher$HandlerManager): void 0%   (0/1)0%   (0/20)0%   (0/6)
addInternal (EventHandlerWrapper): void 100% (1/1)43%  (40/93)64%  (7/11)
match (Object, ExecutorService): Iterable 100% (1/1)60%  (94/156)64%  (18.6/29)
reset (): void 100% (1/1)65%  (13/20)57%  (3.4/6)
addHandler (EventHandler): Object 100% (1/1)68%  (25/37)64%  (5.8/9)
takeSnapshot (AbstractEventBus$Snapshot): void 100% (1/1)71%  (17/24)57%  (3.4/6)
removeHandlers (Iterable): void 100% (1/1)72%  (18/25)57%  (3.4/6)
buildJoinNetwork (EventHandlerWrapper): Collection 100% (1/1)74%  (237/320)83%  (37.4/45)
wire (PredicateChainingMatcher$PredicateNode, EventHandlerWrapper, int): Pred... 100% (1/1)91%  (368/406)95%  (56/59)
extractIndexPath (Set, Predicate []): Predicate 100% (1/1)91%  (312/341)88%  (73/83)
<static initializer> 100% (1/1)100% (5/5)100% (2/2)
PredicateChainingMatcher (): void 100% (1/1)100% (9/9)100% (2/2)
getEventBus (): EventBus 100% (1/1)100% (3/3)100% (1/1)
permutate (PredicateChainingMatcher$PredicateNode [], int, List): void 100% (1/1)100% (100/100)100% (20/20)
setEventBus (EventBus): void 100% (1/1)100% (4/4)100% (2/2)
size (PredicateChainingMatcher$PredicateNode []): int 100% (1/1)100% (23/23)100% (5/5)
     
class PredicateChainingMatcher$1100% (1/1)100% (2/2)82%  (75/91)80%  (13.6/17)
compare (PredicateChainingMatcher$PredicateNode, PredicateChainingMatcher$Pre... 100% (1/1)81%  (69/85)77%  (11.6/15)
PredicateChainingMatcher$1 (PredicateChainingMatcher): void 100% (1/1)100% (6/6)100% (2/2)
     
class PredicateChainingMatcher$EventBusJoiner100% (1/1)100% (1/1)100% (9/9)100% (2/2)
PredicateChainingMatcher$EventBusJoiner (PredicateChainingMatcher, Joiner$Col... 100% (1/1)100% (9/9)100% (2/2)
     
class PredicateChainingMatcher$JoinInput100% (1/1)100% (1/1)100% (21/21)100% (5/5)
PredicateChainingMatcher$JoinInput (PredicateChainingMatcher, int): void 100% (1/1)100% (21/21)100% (5/5)
     
class PredicateChainingMatcher$PredicateNode100% (1/1)100% (2/2)100% (30/30)100% (7/7)
PredicateChainingMatcher$PredicateNode (PredicateChainingMatcher, Set): void 100% (1/1)100% (14/14)100% (4/4)
PredicateChainingMatcher$PredicateNode (PredicateChainingMatcher, int): void 100% (1/1)100% (16/16)100% (4/4)

1package com.hammurapi.eventbus;
2 
3import java.lang.reflect.Array;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.Comparator;
9import java.util.HashMap;
10import java.util.HashSet;
11import java.util.Iterator;
12import java.util.LinkedList;
13import java.util.List;
14import java.util.ListIterator;
15import java.util.Map;
16import java.util.Set;
17import java.util.TreeSet;
18import java.util.concurrent.ExecutionException;
19import java.util.concurrent.ExecutorService;
20import java.util.concurrent.Future;
21import java.util.concurrent.atomic.AtomicReference;
22import java.util.concurrent.locks.ReadWriteLock;
23import java.util.logging.Level;
24import java.util.logging.Logger;
25 
26import com.hammurapi.common.FreezeableCollection;
27import com.hammurapi.common.Joiner;
28import com.hammurapi.common.concurrent.TrackingExecutorService;
29import com.hammurapi.eventbus.AbstractEventBus.Handle;
30import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
31import com.hammurapi.extract.And;
32import com.hammurapi.extract.CommutativeAnd;
33import com.hammurapi.extract.CommutativeOr;
34import com.hammurapi.extract.CompositePredicate;
35import com.hammurapi.extract.Extractor;
36import com.hammurapi.extract.False;
37import com.hammurapi.extract.MappedPredicate;
38import com.hammurapi.extract.Predicate;
39import com.hammurapi.extract.True;
40 
41public abstract class PredicateChainingMatcher<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements Matcher<E, P, C, K, H, S> {
42        private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName());
43 
44        /**
45         * Creates a wrapper around the master executor service for task tracking purposes.
46         * @param master Master executor service
47         * @return
48         */
49        protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name);
50        
51        @Override
52        public Iterable<EventHandlerWrapper<E, P, C, K, H, S>> match(E event, ExecutorService executorService) {
53                final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
54                if (fineIsLoggable) {
55                        logger.fine("Collecting handlers to fire for event "+event);
56                }
57 
58                List<EventHandlerWrapper<E, P, C, K, H, S>> handlers = new ArrayList<EventHandlerWrapper<E,P,C,K,H,S>>();
59                Map<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>> cache = new HashMap<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>>();
60                
61                getLock().readLock().lock();
62                try {
63                        TrackingExecutorService ces = createExecutorService(executorService, true, "Handler collector");
64                        if (ces == null) {
65                                rootNode.collectHandlers(cache, handlers, event);
66                        } else {
67                                // Reference for robustness - unset when done. 
68                                FreezeableCollection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>> collector = new FreezeableCollection<Future<Collection<EventHandlerWrapper<E,P,C,K,H,S>>>>(Collections.synchronizedCollection(new ArrayList<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>()));
69                                AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>> collectorRef = new AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>>(Collections.synchronizedCollection(collector)); 
70                                rootNode.collectHandlers(cache, ces, collectorRef, event);
71                                try {
72                                        ces.join();
73                                } catch (InterruptedException ie) {
74                                        throw new EventDispatchException(ie);
75                                }
76 
77                                collector.freeze();
78                                for (Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>> future: collectorRef.get()) { // To catch errors when handlers are added after join()
79                                        try {
80                                                handlers.addAll(future.get());
81                                        } catch (ExecutionException e) {
82                                                throw new EventDispatchException("Problem collecting handlers to fire: "+e, e);
83                                        } catch (InterruptedException e) {
84                                                throw new EventDispatchException("Collecting handlers to fire has been interrupted: "+e, e);
85                                        }
86                                }
87                        }
88                } finally {
89                        getLock().readLock().unlock();
90                }
91                
92                // Sort handlers before execution.
93                Collections.sort(handlers, new Comparator<EventHandlerWrapper<E, P, C, K, H, S>>() {
94 
95                        @Override
96                        public int compare(EventHandlerWrapper<E, P, C, K, H, S> o1, EventHandlerWrapper<E, P, C, K, H, S> o2) {                                
97                                P p1 = o1.getPriority();
98                                P p2 = o2.getPriority();
99                                if (p2==null) {
100                                        return p1==null ? o2.hashCode() - o1.hashCode() : 1; 
101                                }
102                                if (p1==null) {
103                                        return -1;
104                                }
105                                return p2.compareTo(p1);
106                        }
107                        
108                });
109                
110                if (fineIsLoggable) {
111                        logger.fine("Collected "+handlers.size()+" handlers to fire: "+handlers);
112                }
113                
114                return handlers;
115        }
116 
117        @Override
118        public K addHandler(EventHandler<E, P, C, H, S> eventHandler) {
119                if (eventHandler.getCardinality()<1) {
120                        throw new DispatchNetworkException("Handler cardinality cannot be less than one.");
121                }
122                getLock().writeLock().lock();
123                try {
124                        final K ret = nextId();                                                
125                        addInternal(wrap(eventHandler, ret));                        
126                        return ret;
127                } finally {
128                        getLock().writeLock().unlock();
129                }
130        }
131        
132        /**
133         * Lock for the inference network.
134         * @return
135         */
136        protected abstract ReadWriteLock getLock();
137 
138        @Override
139        public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) {
140                getLock().writeLock().lock(); // Locking the bus for writing and as such for reading.
141                try {
142                        // Step 2 Dispatch network & join collections.
143                        rootNode.takeSnapshot(snapshot, new HashSet<K>());
144                } finally {
145                        getLock().writeLock().unlock(); // Done!
146                }
147        }
148        
149        private void addInternal(EventHandlerWrapper<E, P, C, K, H, S> wrappedHandler) {
150                if (wrappedHandler.getPredicate() instanceof CommutativeOr) {
151                        for (Predicate<E, C> part: ((CommutativeOr<E, C>) wrappedHandler.getPredicate()).getParts()) {
152                                addInternal(new EventHandlerWrapperFilter<E, P, C, K, H, S>(wrappedHandler, part));
153                        }
154                } else {
155                        // Don't bother if always false
156                        if (!False.getInstance().equals(wrappedHandler.getPredicate())) {
157                                if (wrappedHandler.getCardinality()==1) {
158                                        if (!rootNode.addHandler(wrappedHandler, null)) {
159                                                throw new DispatchNetworkException("Predicate "+wrappedHandler.getPredicate()+" cannot be added to the root node, handler "+wrappedHandler.getHandler());
160                                        }
161                                } else {
162                                        for (JoinInput ji: buildJoinNetwork(wrappedHandler)) {                                                
163                                                if (!rootNode.addHandler(ji.handler, null)) {
164                                                        throw new DispatchNetworkException("Predicate "+ji.predicate+" cannot be added to the root node, for join handler");                                                        
165                                                }
166                                        }
167                                }
168                        }
169                }
170        }
171        
172        protected class JoinInput {
173                
174                K id;
175                
176                public JoinInput(int index) {
177                        this.index = index;
178                        this.id = nextId();
179                        this.map = new int[] {index};
180                }
181                
182                EventHandlerWrapper<E, P, C, K, H, S> handler;
183                
184                Predicate<E, C> predicate;
185 
186                JoinNode joinNode;
187                
188                final int index;
189                
190                final int[] map;
191                
192        }
193        
194        //        private JoinNode createJoinNode(PredicateNode pn,
195        //                        EventHandlerWrapper<E, P, C, K, H> handler) {
196        //                // TODO Auto-generated method stub
197        //                return null;
198        //        }
199                        
200                protected class JoinInputEventHandler implements EventHandlerWrapper<E, P, C, K, H, S> {
201                        
202                        private EventBusJoiner joiner;
203                        private int idx;
204                        final K id = nextId();
205                        private EventHandlerWrapper<E, P, C, K, H, S> handler;
206                        private JoinNode joinNode;
207                        private Predicate<E, C> predicate;
208                        private int cardinality;
209        
210                        JoinInputEventHandler(
211                                        JoinNode joinNode, 
212                                        int idx,                                 
213                                        EventHandlerWrapper<E, P, C, K, H, S> handler,
214                                        Predicate<E, C> predicate,
215                                        int cardinality) {
216                                
217                                this.joiner = joinNode.joiner;
218                                this.joinNode = joinNode;
219                                this.idx = idx;
220                                this.handler = handler;
221                                this.joinNode = joinNode;        
222                                this.predicate = predicate;
223                                this.cardinality = cardinality;
224                        }
225                        
226                        public K getId() {
227                                return id;
228                        }
229        
230                        @Override
231                        public boolean consumes() {
232                                return handler.consumes();
233                        }
234        
235                        @Override
236                        public int getCardinality() {
237                                return cardinality;
238                        }
239        
240                        @Override
241                        public C getContext() {
242                                return handler.getContext();
243                        }
244        
245                        @Override
246                        public P getPriority() {
247                                return handler.getPriority();
248                        }
249                        
250                        @Override
251                        public void post(
252                                        EventDispatchContext<E, P, C, H, S> context, 
253                                        InferenceContext<E,P,C,K,H,S> inferenceContext, 
254                                        Handle<E,P,C,K>... handles) {                        
255                                try {
256                                        
257                                        if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
258//                                                if (inferenceContext.getInferenceCommandsQueue()!=null) {
259//                                                        throw new IllegalArgumentException("Inference commands queue shall be null");
260//                                                }
261                                                inferenceContext = inferenceContext.wrap();
262                                        }
263                                                                                
264                                        joiner.addInput(idx, handles, new CompositeContext<E, P, C, K, H, S>(
265                                                        context, 
266                                                        handler.getContext(), 
267                                                        inferenceContext));        
268                                        
269                                } catch (Exception e) {
270                                        logger.log(Level.SEVERE, "Join problem: "+e, e);
271                                        if (bus.getExceptionHandler()!=null) {
272                                                bus.getExceptionHandler().handleException(e);
273                                        }
274                                        if (inferenceContext.getRootHandle()!=null) {
275                                                inferenceContext.getRootHandle().handleException(e);
276                                        }
277                                } finally {
278                                        if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
279                                                inferenceContext.processInferenceCommands();
280                                        }                                                        
281                                }
282                        }
283        
284                        @Override
285                        public void reset() {
286                                joiner.reset();
287                                handler.reset();
288                        }
289        
290                        @Override
291                        public EventHandler<E, P, C, H, S> getHandler() {
292                                throw new UnsupportedOperationException();
293                        }
294        
295                        @Override
296                        public Set<K> getRegistrationKeys() {
297                                return handler.getRegistrationKeys();
298                        }
299        
300                        @Override
301                        public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) {
302                                if (taken.add(id)) {
303                                        snapshot.joinInput(id, joinNode.id, idx);
304                                        joinNode.takeSnapshot(snapshot, taken);
305                                }
306                        }
307        
308                        @Override
309                        public Predicate<E, C> getPredicate() {
310                                return predicate;
311                        }
312        
313                        @Override
314                        public String toString() {
315                                return "JoinInputEventHandler [joiner=" + joiner + 
316                                        ", idx=" + idx + 
317                                        ", id=" + id + 
318                                        ", handler=" + handler + 
319                                        ", joinNode=" + joinNode + 
320                                        ", predicate=" + predicate + "]";
321                        }
322        
323                        @Override
324                        public boolean isOneOff() {
325                                return handler==null ? joinNode.isOneOff() : handler.isOneOff();
326                        }
327        
328                        @Override
329                        public Mode getMode() {
330                                return handler==null ? joinNode.getMode() : handler.getMode();
331                        }
332                                                        
333                }
334 
335        /**
336                 * Joins several inputs, applies predicate(s) 
337                 * @author Pavel Vlasov
338                 *
339                 */
340                protected class JoinNode {
341                        final Set<Integer> outputIndices;
342                        Predicate<E, C> predicate;
343                        EventBusJoiner joiner;
344                        EventHandlerWrapper<E, P, C, K, H, S> finalEventHandler;
345                        int finalCardinality;
346                        
347                        public JoinNode(int finalCardinality, PredicateNode pn) {
348                                if (pn.userObject!=null) {
349                                        throw new EventBusException("User object is not null!");
350                                }
351                                this.outputIndices = pn.indexes;
352                                this.predicate = pn.predicate;
353                                pn.userObject = this;
354                                this.finalCardinality = finalCardinality;
355                                this.id = nextId();
356                        }
357                        
358                        JoinNode nxt;
359        //                List<JoinNode> inputNodes = new ArrayList<JoinNode>();
360        //                
361        //                List<JoinInput> joinInputs = new ArrayList<JoinInput>(); 
362                        
363                        // Maps input indexes to output
364                        Mapper<Handle<E,P,C,K>>[] outputMappers;
365                        
366                        // Maps input indexes to predicate
367                        Mapper<Handle<E,P,C,K>>[] predicateMappers;
368                        
369                        K id;
370                        
371                        void setNext(JoinNode next) {
372                                nxt = next;
373        //                        nxt.inputNodes.add(this);
374                        }
375        
376                        void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) {
377                                if (taken.add(id)) {
378                                        if (finalEventHandler!=null) {
379                                                for (K hid: finalEventHandler.getRegistrationKeys()) {
380                                                        if (taken.add(hid)) {
381                                                                snapshot.handler(hid, finalEventHandler.getHandler());
382                                                        }
383                                                }
384                                        }
385                                        K hid = finalEventHandler==null ? null : finalEventHandler.getRegistrationKeys().iterator().next();
386                                        if (nxt!=null) {
387                                                nxt.takeSnapshot(snapshot, taken);
388                                        }
389                                        snapshot.joinNode(id, predicate, outputIndices, hid, nxt==null ? null : nxt.id);
390                                        joiner.takeSnapshot(id, snapshot, taken);                                
391                                }
392                        }
393        
394                        public boolean isOneOff() {
395                                return nxt==null ? finalEventHandler.isOneOff() : nxt.isOneOff();
396                        }
397        
398                        public EventHandlerBase.Mode getMode() {
399                                return nxt==null ? finalEventHandler.getMode() : nxt.getMode();
400                        }
401                }
402 
403        /**
404         * Helper class for building join network.
405         * @author Pavel Vlasov
406         *
407         */
408        private class PredicateNode {
409                
410                int position;
411                
412                PredicateNode(Set<Integer> indexes) {
413                        this.indexes = indexes;
414                }
415                
416                PredicateNode(int index) {
417                        indexes = Collections.singleton(index);
418                }
419                
420                Set<Integer> indexes;
421                Predicate<E,C> predicate;
422                List<PredicateNode> inputs = new ArrayList<PredicateNode>();
423                Object userObject;
424        }
425 
426        /**
427         * Root inference node with True predicate. Shall be initialized by subclass.
428         */
429        @SuppressWarnings("unchecked")
430        protected PredicatedInferenceNode<E, P, C, K, H, S> rootNode;                
431 
432        /**
433         * Constructs join network for multi-event handlers. 
434         * @param handler Handler
435         * @param predicate Predicate
436         * @return Collection of input handlers with cardinality 1.
437         * @throws CloneNotSupportedException 
438         */
439        @SuppressWarnings("unchecked")
440        private Collection<JoinInput> buildJoinNetwork(final EventHandlerWrapper<E, P, C, K, H, S> handler) {
441 
442                Predicate<E,C>[] pa = new Predicate[] {handler.getPredicate()};                
443                // Stupid, but Eclipse doesn't allow to create an array of JoinInput using new JoinInput[].
444                JoinInput[] ret = (JoinInput[]) Array.newInstance(JoinInput.class, handler.getCardinality());                                                
445                PredicateNode[] pna = (PredicateNode[]) Array.newInstance(PredicateNode.class, handler.getCardinality());
446                // Initialization.
447                for (int i=0; i<pna.length; ++i) {
448                        pna[i]=new PredicateNode(i);
449                        pna[i].predicate = extractIndexPath(Collections.singleton(i), pa);
450                        pna[i].position = i;
451 
452                        // Initialization
453                        ret[i] = new JoinInput(i);
454                        ret[i].predicate = pna[i].predicate;
455                        pna[i].userObject = ret[i];
456                }
457                
458                while (size(pna)>1 && pa[0]!=null) {
459                        List<PredicateNode> collector = new ArrayList<PredicateNode>();
460                        permutate(pna, 0, collector);
461                        Collections.sort(collector, INDEX_SET_COMPARATOR);
462                        for (PredicateNode candidate: collector) {
463                                candidate.predicate = extractIndexPath(candidate.indexes, pa);
464                                if (candidate.predicate!=null) {
465                                        candidate.position = candidate.inputs.get(0).position;
466                                        for (PredicateNode input: candidate.inputs) {
467                                                pna[input.position]=null;
468                                        }
469                                        pna[candidate.position]=candidate;
470                                        break;
471                                }
472                        }
473                }
474                
475                if (size(pna)>1) {
476                         // Create final predicate-less predicate node
477                        Set<Integer> allIndexes = new TreeSet<Integer>();
478                        for (int i=0; i<handler.getCardinality(); ++i) {
479                                allIndexes.add(i);
480                        }
481                        PredicateNode finalPredicateNode = new PredicateNode(allIndexes);
482                        for (int i=0; i<pna.length; ++i) {
483                                if (pna[i]!=null) {
484                                        finalPredicateNode.inputs.add(pna[i]);
485                                        pna[i]=null;
486                                }
487                        }
488                        pna[0] = finalPredicateNode;
489                }
490                
491                if (pa[0]!=null && !(pa[0] instanceof True)) {
492                        // Index-less predicate, add to final predicate node.
493                        if (pna[0].predicate == null) {
494                                pna[0].predicate = pa[0];
495                        } else if (pna[0].predicate instanceof And) {
496                                pna[0].predicate = ((And<E,C>) pna[0].predicate).add(pa[0]);
497                        } else if (pna[0].predicate instanceof CommutativeAnd) {
498                                pna[0].predicate = ((CommutativeAnd<E,C>) pna[0].predicate).add(pa[0]);
499                        } else {
500                                pna[0].predicate = new And<E,C>(0, null, pna[0].predicate, pa[0]);
501                        }
502                }                
503                
504                for (PredicateNode pn: pna) {
505                        if (pn!=null) { // There should be only one such node.
506                                wire(pn, handler, handler.getCardinality()).finalEventHandler=handler;
507                                break;
508                        }
509                }                
510                                                
511                return Arrays.asList(ret);
512        }
513        
514        /**
515         * Extracts path for specified indexes from the predicate, replaces predicate if extraction was successful.
516         * @param idx Index.
517         * @param pa Single element array with source predicate. If extraction is successful then source predicate gets replaced with 
518         * predicate without index parts.
519         * @param indexPath Index path to accumulate result.
520         * @return predicate for the specified index or null if extraction wasn't successful.
521         */
522        private Predicate<E,C> extractIndexPath(Set<Integer> indexes, Predicate<E, C>[] pa) {                
523                
524                if (pa[0] instanceof And) {
525                        List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>();
526                        List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((And<E,C>) pa[0]).getParts());
527                        double currentCost = 0;
528                        int ahead = 0;
529                        ListIterator<Predicate<E, C>> pit = parts.listIterator();
530                        Z: while (pit.hasNext()) {
531                                Predicate<E, C> part = pit.next();
532                                if (ahead==0) {
533                                        currentCost=part.getCost();
534                                }
535                                ++ahead;
536                                                                
537                                Set<Integer> parameterIndices = part.parameterIndices();
538                                if (indexes.containsAll(parameterIndices)) {
539                                        // Extract only if all lower cost predicates before were extracted.
540                                        if (currentCost<part.getCost()) {
541                                                continue;
542                                        }
543                                        // Exact matching index predicate - extract
544                                        indexPath.add(part);
545                                        pit.remove();
546                                        --ahead;
547                                } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) { 
548                                        // Multi-index predicate with matching index. Try to extract. Break if not possible.
549                                        @SuppressWarnings("unchecked")
550                                        Predicate<E, C>[] spa = new Predicate[] {part};
551                                        Predicate<E, C> subPart = extractIndexPath(indexes, spa);
552                                        if (subPart==null) {
553                                                break;
554                                        } else {
555                                                indexPath.add(subPart);
556                                                if (spa[0]==null) {
557                                                        pit.remove();
558                                                        --ahead;
559                                                } else {
560                                                        pit.set(spa[0]);
561                                                        currentCost=spa[0].getCost();
562                                                }
563                                                for (Integer idx: indexes) {
564                                                        if (spa[0].parameterIndices().contains(idx)) {
565                                                                // Still has the index - break.
566                                                                break Z;
567                                                        }
568                                                }
569                                        }
570                                }
571                        }                        
572                        
573                        if (indexPath.isEmpty()) {
574                                return null;
575                        }
576                        
577                        switch (parts.size()) {
578                        case 0:
579                                pa[0] = null;
580                                break;
581                        case 1: 
582                                pa[0] = parts.get(0);
583                                break;
584                        default:
585                                pa[0] = new And<E,C>(0, null, parts);
586                                break;
587                        }
588                        
589                        return indexPath.size()==1 ? indexPath.get(0) : new And<E,C>(0, null, indexPath);                        
590                } else if (pa[0] instanceof CommutativeAnd) {
591                        List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>();
592                        List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((CommutativeAnd<E,C>) pa[0]).getParts());
593                        double currentCost = 0;
594                        int ahead=0;
595                        ListIterator<Predicate<E, C>> pit = parts.listIterator();
596                        while (pit.hasNext()) {
597                                Predicate<E, C> part = pit.next();                                
598                                if (ahead==0) {
599                                        currentCost = part.getCost();
600                                }
601                                ++ahead;
602                                
603                                Set<Integer> parameterIndices = part.parameterIndices();
604                                if (indexes.containsAll(parameterIndices)) { 
605                                        // Extract only if all lower cost predicates before were extracted.
606                                        if (currentCost<part.getCost()) {
607                                                continue;
608                                        }
609                                        // Single matching index predicate - extract
610                                        indexPath.add(part);
611                                        pit.remove();
612                                        --ahead;
613                                } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) { 
614                                        // Multi-index predicate with matching index. Try to extract. 
615                                        @SuppressWarnings("unchecked")
616                                        Predicate<E, C>[] spa = new Predicate[] {part};
617                                        Predicate<E, C> subPart = extractIndexPath(indexes, spa);
618                                        if (subPart!=null) {
619                                                indexPath.add(subPart);
620                                                if (spa[0]==null) {
621                                                        pit.remove();
622                                                        --ahead;
623                                                } else {
624                                                        pit.set(spa[0]);
625                                                        currentCost = spa[0].getCost();
626                                                }
627                                        }
628                                }
629                        }                        
630                        
631                        if (indexPath.isEmpty()) {
632                                return null;
633                        }
634                        
635                        switch (parts.size()) {
636                        case 0:
637                                pa[0] = null;
638                                break;
639                        case 1: 
640                                pa[0] = parts.get(0);
641                                break;
642                        default:
643                                pa[0] = new CommutativeAnd<E,C>(0, null, parts);
644                                break;
645                        }
646                        
647                        return indexPath.size()==1 ? indexPath.get(0) : new CommutativeAnd<E,C>(0, null, indexPath);                        
648                }
649                
650                if (indexes.containsAll(pa[0].parameterIndices())) {
651                        Predicate<E, C> ret = pa[0];
652                        pa[0] = null;
653                        return ret;
654                }
655                
656                return null;
657        }
658 
659        /**
660         * Creates joiner which outputs to given event handler.
661         * @param jn
662         */
663        @SuppressWarnings("unchecked")
664        private JoinNode wire(PredicateNode pn, EventHandlerWrapper<E, P, C, K, H, S> handler, int finalCardinality) {
665                JoinNode ret = new JoinNode(finalCardinality, pn);                
666                
667                // Output mappers
668                ret.outputMappers = new Mapper[pn.inputs.size()];
669                int[][] jind = new int[ret.outputMappers.length][];
670                List<Integer> outputIndices = new ArrayList<Integer>(ret.outputIndices);
671                Collections.sort(outputIndices);
672                int idx = 0;
673                for (PredicateNode ipn: pn.inputs) {
674                        if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
675                                JoinInput ji = (JoinInput) ipn.userObject;
676                                int jii = outputIndices.indexOf(ji.index);
677                                if (jii==-1) {
678                                        throw new IllegalStateException("Join input index "+ji.index+" is not present in output indices "+outputIndices);
679                                }
680                                jind[idx] = new int[] {ji.index};
681                                ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {jii});
682                        } else {
683                                List<Integer> ioi = new ArrayList<Integer>(ipn.indexes);
684                                Collections.sort(ioi);
685                                int[] map = new int[ioi.size()];
686                                Iterator<Integer> ioiit = ioi.iterator();
687                                for (int i=0; ioiit.hasNext(); ++i) {
688                                        int oi = ioiit.next();
689                                        int jii = outputIndices.indexOf(oi);
690                                        if (jii==-1) {
691                                                throw new IllegalStateException("Input node index "+oi+" at offset "+i+" is not present in output indices "+outputIndices);
692                                        }
693                                        map[i]=jii;
694                                }
695                                jind[idx] = new int[ioi.size()];
696                                for (int i=0; i<jind.length; ++i) {
697                                        jind[idx][i] = ioi.get(i);
698                                }
699                                ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map);
700                        }
701                }
702                
703                MappingEventHandler<E, P, C, K, H, S> meh = new MappingEventHandler<E, P, C, K, H, S>(handler, ret.outputMappers);
704                ret.joiner = createJoiner(meh, jind);
705                                
706                // Predicate mappers.
707                ret.predicateMappers = new Mapper[pn.inputs.size()];
708                idx = 0;
709                for (PredicateNode ipn: pn.inputs) {
710                        if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
711                                JoinInput ji = (JoinInput) ipn.userObject;
712                                ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {ji.index});
713                        } else {
714                                List<Integer> ioi = new ArrayList<Integer>(ipn.indexes);
715                                Collections.sort(ioi);
716                                int [] map = new int[ioi.size()];
717                                Iterator<Integer> ioiit = ioi.iterator();
718                                for (int i=0; ioiit.hasNext(); ++i) {
719                                        map[i]=ioiit.next();
720                                }
721                                ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map);
722                        }
723                }
724                
725                if (ret.predicate instanceof And || ret.predicate instanceof CommutativeAnd) {
726                        for (Predicate<E, C> p: ((CompositePredicate<E,?,C,?>) ret.predicate).getParts()) {
727                                ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(p, ret.predicateMappers, bus.getEventType()));                                
728                        }
729                } else if (ret.predicate!=null) {
730                        ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(ret.predicate, ret.predicateMappers, bus.getEventType()));
731                }
732                
733                for (int i=0, l=pn.inputs.size(); i<l; ++i) {                        
734                        PredicateNode ipn = pn.inputs.get(i);
735                        if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
736                                JoinInput joinInput = (JoinInput) ipn.userObject;
737                                joinInput.joinNode = ret;
738                                Predicate<E, C> jp;
739                                if (joinInput.predicate==null) {
740                                        jp = True.getInstance(); 
741                                } else {
742                                        jp = MappedPredicate.mapPredicate(joinInput.predicate, joinInput.map);                                                        
743                                }
744        
745                                joinInput.handler = new JoinInputEventHandler(ret, i, handler, jp, 1);
746                        } else {
747                                wire(ipn, new JoinInputEventHandler(ret, i, handler, null, ipn.inputs.size()), finalCardinality).nxt=ret;
748                        }
749                }
750                return ret;
751        }
752 
753        @Override
754        public void removeHandlers(Iterable<K> keys) {
755                getLock().writeLock().lock();
756                try {
757                        rootNode.remove(null, null, true, keys);
758                } finally {
759                        getLock().writeLock().unlock();
760                }                
761        }
762                
763        private void permutate(PredicateNode[] pna, int offset, List<PredicateNode> collector) {
764        if (offset==pna.length) {
765                Set<Integer> set = new TreeSet<Integer>();
766                int joinCount = 0;
767                for (int i=0; i<pna.length; ++i) {
768                    if (pna[i]!=null) {
769                        set.addAll(pna[i].indexes);
770                        ++joinCount;
771                    }
772                }
773                if (set.size()>1 && joinCount>1) {
774                        PredicateNode pn = new PredicateNode(set);
775                        for (PredicateNode input: pna) {
776                                if (input!=null) {
777                                        pn.inputs.add(input);
778                                }
779                        }
780                    collector.add(pn);
781                }
782        } else {
783            PredicateNode pn = pna[offset];
784            permutate(pna, offset+1, collector);
785            if (pn!=null) {
786                    pna[offset] = null;
787                    permutate(pna, offset+1, collector);
788                    pna[offset] = pn;
789            }
790        }
791    }
792        
793    private final Comparator<PredicateNode> INDEX_SET_COMPARATOR = new Comparator<PredicateNode>() {
794 
795        @Override
796        public int compare(PredicateNode p0, PredicateNode p1) {                
797                double cost0 = p0.predicate==null ? 0 : p0.predicate.getCost();
798                        double cost1 = p1.predicate==null ? 0 : p1.predicate.getCost();
799                        double costDelta = cost0 - cost1;
800                if (costDelta<0) {
801                        return -1;
802                }
803                if (costDelta>0) {
804                        return 1;
805                }
806            int sizeDelta = p0.indexes.size() - p1.indexes.size();
807            if (sizeDelta!=0) {
808                    return sizeDelta;
809            }
810 
811            for (int i: p0.indexes) {
812                    for (int j: p1.indexes) {
813                            if (i!=j) {
814                                    return i-j;
815                            }
816                    }
817            }
818 
819            return p0.hashCode() - p1.hashCode();
820        }
821 
822    };
823        private EventBus<E, P, C, K, H, S> bus;
824 
825    private int size(PredicateNode[] pna) {
826            int ret=0;
827            for (PredicateNode pn: pna) {
828                    if (pn!=null) {
829                            ++ret;
830                    }
831            }
832            return ret;
833    }
834        
835        public void reset() {
836                getLock().writeLock().lock();
837                try {
838                        rootNode.reset();
839                } finally {
840                        getLock().writeLock().unlock();
841                }
842        }
843        
844        @Override
845        public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) {
846                getLock().writeLock().lock();
847                try {
848                        handlerManager.manageHandlers(this);
849                } finally {
850                        getLock().writeLock().unlock();
851                }
852        }
853        
854        protected abstract PredicatedInferenceNode<E, P, C, K, H, S> createPredicatedInferenceNode(PredicatedInferenceNode<E, P, C, K, H, S> parent, Predicate<E, C> predicate, C context);
855        
856        protected abstract Set<K> extractHandlerIds(EventHandlerWrapper<E, P, C, K, H, S> handler);
857 
858        /**
859         * Generates handler ID.
860         * @return
861         */
862        protected abstract K nextId();
863        
864        /**
865         * Factory method for handler wrapper.
866         * @param eventHandler
867         * @param oneOff
868         * @return
869         */
870        protected abstract EventHandlerWrapper<E, P, C, K, H, S> wrap(EventHandler<E, P, C, H, S> eventHandler,        K registrationKey);
871        
872        protected abstract class EventBusJoiner extends Joiner<Handle<E,P,C,K>[], CompositeContext<E, P, C, K, H, S>, Object> {
873 
874                
875                protected EventBusJoiner(
876                                Collector<Handle<E,P,C,K>[]>[] inputCollectors,
877                                Class<Handle<E,P,C,K>[]> inputType, 
878                                boolean outerJoin) {
879                        
880                        super(inputCollectors, inputType, outerJoin);
881                }
882 
883                protected abstract void takeSnapshot(K joinNodeId, Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken);
884                
885        }
886        
887        protected abstract EventBusJoiner createJoiner(JoinEventHandler<E, P, C, K, H, S> handler, int[][] indices);
888 
889        @Override
890        public void setEventBus(EventBus<E, P, C, K, H, S> bus) {
891                this.bus = bus;                
892        }
893        
894        public EventBus<E, P, C, K, H, S> getEventBus() {
895                return bus;
896        }
897}

[all classes][com.hammurapi.eventbus]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov