001 package com.hammurapi.eventbus;
002
003 import java.lang.reflect.Array;
004 import java.util.ArrayList;
005 import java.util.Arrays;
006 import java.util.Collection;
007 import java.util.Collections;
008 import java.util.Comparator;
009 import java.util.HashMap;
010 import java.util.HashSet;
011 import java.util.Iterator;
012 import java.util.LinkedList;
013 import java.util.List;
014 import java.util.ListIterator;
015 import java.util.Map;
016 import java.util.Set;
017 import java.util.TreeSet;
018 import java.util.concurrent.ExecutionException;
019 import java.util.concurrent.ExecutorService;
020 import java.util.concurrent.Future;
021 import java.util.concurrent.atomic.AtomicReference;
022 import java.util.concurrent.locks.ReadWriteLock;
023 import java.util.logging.Level;
024 import java.util.logging.Logger;
025
026 import com.hammurapi.common.FreezeableCollection;
027 import com.hammurapi.common.Joiner;
028 import com.hammurapi.common.concurrent.TrackingExecutorService;
029 import com.hammurapi.eventbus.AbstractEventBus.Handle;
030 import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
031 import com.hammurapi.extract.And;
032 import com.hammurapi.extract.CommutativeAnd;
033 import com.hammurapi.extract.CommutativeOr;
034 import com.hammurapi.extract.CompositePredicate;
035 import com.hammurapi.extract.Extractor;
036 import com.hammurapi.extract.False;
037 import com.hammurapi.extract.MappedPredicate;
038 import com.hammurapi.extract.Predicate;
039 import com.hammurapi.extract.True;
040
041 public abstract class PredicateChainingMatcher<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements Matcher<E, P, C, K, H, S> {
042 private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName());
043
044 /**
045 * Creates a wrapper around the master executor service for task tracking purposes.
046 * @param master Master executor service
047 * @return
048 */
049 protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name);
050
051 @Override
052 public Iterable<EventHandlerWrapper<E, P, C, K, H, S>> match(E event, ExecutorService executorService) {
053 final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
054 if (fineIsLoggable) {
055 logger.fine("Collecting handlers to fire for event "+event);
056 }
057
058 List<EventHandlerWrapper<E, P, C, K, H, S>> handlers = new ArrayList<EventHandlerWrapper<E,P,C,K,H,S>>();
059 Map<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>> cache = new HashMap<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>>();
060
061 getLock().readLock().lock();
062 try {
063 TrackingExecutorService ces = createExecutorService(executorService, true, "Handler collector");
064 if (ces == null) {
065 rootNode.collectHandlers(cache, handlers, event);
066 } else {
067 // Reference for robustness - unset when done.
068 FreezeableCollection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>> collector = new FreezeableCollection<Future<Collection<EventHandlerWrapper<E,P,C,K,H,S>>>>(Collections.synchronizedCollection(new ArrayList<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>()));
069 AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>> collectorRef = new AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>>(Collections.synchronizedCollection(collector));
070 rootNode.collectHandlers(cache, ces, collectorRef, event);
071 try {
072 ces.join();
073 } catch (InterruptedException ie) {
074 throw new EventDispatchException(ie);
075 }
076
077 collector.freeze();
078 for (Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>> future: collectorRef.get()) { // To catch errors when handlers are added after join()
079 try {
080 handlers.addAll(future.get());
081 } catch (ExecutionException e) {
082 throw new EventDispatchException("Problem collecting handlers to fire: "+e, e);
083 } catch (InterruptedException e) {
084 throw new EventDispatchException("Collecting handlers to fire has been interrupted: "+e, e);
085 }
086 }
087 }
088 } finally {
089 getLock().readLock().unlock();
090 }
091
092 // Sort handlers before execution.
093 Collections.sort(handlers, new Comparator<EventHandlerWrapper<E, P, C, K, H, S>>() {
094
095 @Override
096 public int compare(EventHandlerWrapper<E, P, C, K, H, S> o1, EventHandlerWrapper<E, P, C, K, H, S> o2) {
097 P p1 = o1.getPriority();
098 P p2 = o2.getPriority();
099 if (p2==null) {
100 return p1==null ? o2.hashCode() - o1.hashCode() : 1;
101 }
102 if (p1==null) {
103 return -1;
104 }
105 return p2.compareTo(p1);
106 }
107
108 });
109
110 if (fineIsLoggable) {
111 logger.fine("Collected "+handlers.size()+" handlers to fire: "+handlers);
112 }
113
114 return handlers;
115 }
116
117 @Override
118 public K addHandler(EventHandler<E, P, C, H, S> eventHandler) {
119 if (eventHandler.getCardinality()<1) {
120 throw new DispatchNetworkException("Handler cardinality cannot be less than one.");
121 }
122 getLock().writeLock().lock();
123 try {
124 final K ret = nextId();
125 addInternal(wrap(eventHandler, ret));
126 return ret;
127 } finally {
128 getLock().writeLock().unlock();
129 }
130 }
131
132 /**
133 * Lock for the inference network.
134 * @return
135 */
136 protected abstract ReadWriteLock getLock();
137
138 @Override
139 public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) {
140 getLock().writeLock().lock(); // Locking the bus for writing and as such for reading.
141 try {
142 // Step 2 Dispatch network & join collections.
143 rootNode.takeSnapshot(snapshot, new HashSet<K>());
144 } finally {
145 getLock().writeLock().unlock(); // Done!
146 }
147 }
148
149 private void addInternal(EventHandlerWrapper<E, P, C, K, H, S> wrappedHandler) {
150 if (wrappedHandler.getPredicate() instanceof CommutativeOr) {
151 for (Predicate<E, C> part: ((CommutativeOr<E, C>) wrappedHandler.getPredicate()).getParts()) {
152 addInternal(new EventHandlerWrapperFilter<E, P, C, K, H, S>(wrappedHandler, part));
153 }
154 } else {
155 // Don't bother if always false
156 if (!False.getInstance().equals(wrappedHandler.getPredicate())) {
157 if (wrappedHandler.getCardinality()==1) {
158 if (!rootNode.addHandler(wrappedHandler, null)) {
159 throw new DispatchNetworkException("Predicate "+wrappedHandler.getPredicate()+" cannot be added to the root node, handler "+wrappedHandler.getHandler());
160 }
161 } else {
162 for (JoinInput ji: buildJoinNetwork(wrappedHandler)) {
163 if (!rootNode.addHandler(ji.handler, null)) {
164 throw new DispatchNetworkException("Predicate "+ji.predicate+" cannot be added to the root node, for join handler");
165 }
166 }
167 }
168 }
169 }
170 }
171
172 protected class JoinInput {
173
174 K id;
175
176 public JoinInput(int index) {
177 this.index = index;
178 this.id = nextId();
179 this.map = new int[] {index};
180 }
181
182 EventHandlerWrapper<E, P, C, K, H, S> handler;
183
184 Predicate<E, C> predicate;
185
186 JoinNode joinNode;
187
188 final int index;
189
190 final int[] map;
191
192 }
193
194 // private JoinNode createJoinNode(PredicateNode pn,
195 // EventHandlerWrapper<E, P, C, K, H> handler) {
196 // // TODO Auto-generated method stub
197 // return null;
198 // }
199
200 protected class JoinInputEventHandler implements EventHandlerWrapper<E, P, C, K, H, S> {
201
202 private EventBusJoiner joiner;
203 private int idx;
204 final K id = nextId();
205 private EventHandlerWrapper<E, P, C, K, H, S> handler;
206 private JoinNode joinNode;
207 private Predicate<E, C> predicate;
208 private int cardinality;
209
210 JoinInputEventHandler(
211 JoinNode joinNode,
212 int idx,
213 EventHandlerWrapper<E, P, C, K, H, S> handler,
214 Predicate<E, C> predicate,
215 int cardinality) {
216
217 this.joiner = joinNode.joiner;
218 this.joinNode = joinNode;
219 this.idx = idx;
220 this.handler = handler;
221 this.joinNode = joinNode;
222 this.predicate = predicate;
223 this.cardinality = cardinality;
224 }
225
226 public K getId() {
227 return id;
228 }
229
230 @Override
231 public boolean consumes() {
232 return handler.consumes();
233 }
234
235 @Override
236 public int getCardinality() {
237 return cardinality;
238 }
239
240 @Override
241 public C getContext() {
242 return handler.getContext();
243 }
244
245 @Override
246 public P getPriority() {
247 return handler.getPriority();
248 }
249
250 @Override
251 public void post(
252 EventDispatchContext<E, P, C, H, S> context,
253 InferenceContext<E,P,C,K,H,S> inferenceContext,
254 Handle<E,P,C,K>... handles) {
255 try {
256
257 if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
258 // if (inferenceContext.getInferenceCommandsQueue()!=null) {
259 // throw new IllegalArgumentException("Inference commands queue shall be null");
260 // }
261 inferenceContext = inferenceContext.wrap();
262 }
263
264 joiner.addInput(idx, handles, new CompositeContext<E, P, C, K, H, S>(
265 context,
266 handler.getContext(),
267 inferenceContext));
268
269 } catch (Exception e) {
270 logger.log(Level.SEVERE, "Join problem: "+e, e);
271 if (bus.getExceptionHandler()!=null) {
272 bus.getExceptionHandler().handleException(e);
273 }
274 if (inferenceContext.getRootHandle()!=null) {
275 inferenceContext.getRootHandle().handleException(e);
276 }
277 } finally {
278 if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
279 inferenceContext.processInferenceCommands();
280 }
281 }
282 }
283
284 @Override
285 public void reset() {
286 joiner.reset();
287 handler.reset();
288 }
289
290 @Override
291 public EventHandler<E, P, C, H, S> getHandler() {
292 throw new UnsupportedOperationException();
293 }
294
295 @Override
296 public Set<K> getRegistrationKeys() {
297 return handler.getRegistrationKeys();
298 }
299
300 @Override
301 public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) {
302 if (taken.add(id)) {
303 snapshot.joinInput(id, joinNode.id, idx);
304 joinNode.takeSnapshot(snapshot, taken);
305 }
306 }
307
308 @Override
309 public Predicate<E, C> getPredicate() {
310 return predicate;
311 }
312
313 @Override
314 public String toString() {
315 return "JoinInputEventHandler [joiner=" + joiner +
316 ", idx=" + idx +
317 ", id=" + id +
318 ", handler=" + handler +
319 ", joinNode=" + joinNode +
320 ", predicate=" + predicate + "]";
321 }
322
323 @Override
324 public boolean isOneOff() {
325 return handler==null ? joinNode.isOneOff() : handler.isOneOff();
326 }
327
328 @Override
329 public Mode getMode() {
330 return handler==null ? joinNode.getMode() : handler.getMode();
331 }
332
333 }
334
335 /**
336 * Joins several inputs, applies predicate(s)
337 * @author Pavel Vlasov
338 *
339 */
340 protected class JoinNode {
341 final Set<Integer> outputIndices;
342 Predicate<E, C> predicate;
343 EventBusJoiner joiner;
344 EventHandlerWrapper<E, P, C, K, H, S> finalEventHandler;
345 int finalCardinality;
346
347 public JoinNode(int finalCardinality, PredicateNode pn) {
348 if (pn.userObject!=null) {
349 throw new EventBusException("User object is not null!");
350 }
351 this.outputIndices = pn.indexes;
352 this.predicate = pn.predicate;
353 pn.userObject = this;
354 this.finalCardinality = finalCardinality;
355 this.id = nextId();
356 }
357
358 JoinNode nxt;
359 // List<JoinNode> inputNodes = new ArrayList<JoinNode>();
360 //
361 // List<JoinInput> joinInputs = new ArrayList<JoinInput>();
362
363 // Maps input indexes to output
364 Mapper<Handle<E,P,C,K>>[] outputMappers;
365
366 // Maps input indexes to predicate
367 Mapper<Handle<E,P,C,K>>[] predicateMappers;
368
369 K id;
370
371 void setNext(JoinNode next) {
372 nxt = next;
373 // nxt.inputNodes.add(this);
374 }
375
376 void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) {
377 if (taken.add(id)) {
378 if (finalEventHandler!=null) {
379 for (K hid: finalEventHandler.getRegistrationKeys()) {
380 if (taken.add(hid)) {
381 snapshot.handler(hid, finalEventHandler.getHandler());
382 }
383 }
384 }
385 K hid = finalEventHandler==null ? null : finalEventHandler.getRegistrationKeys().iterator().next();
386 if (nxt!=null) {
387 nxt.takeSnapshot(snapshot, taken);
388 }
389 snapshot.joinNode(id, predicate, outputIndices, hid, nxt==null ? null : nxt.id);
390 joiner.takeSnapshot(id, snapshot, taken);
391 }
392 }
393
394 public boolean isOneOff() {
395 return nxt==null ? finalEventHandler.isOneOff() : nxt.isOneOff();
396 }
397
398 public EventHandlerBase.Mode getMode() {
399 return nxt==null ? finalEventHandler.getMode() : nxt.getMode();
400 }
401 }
402
403 /**
404 * Helper class for building join network.
405 * @author Pavel Vlasov
406 *
407 */
408 private class PredicateNode {
409
410 int position;
411
412 PredicateNode(Set<Integer> indexes) {
413 this.indexes = indexes;
414 }
415
416 PredicateNode(int index) {
417 indexes = Collections.singleton(index);
418 }
419
420 Set<Integer> indexes;
421 Predicate<E,C> predicate;
422 List<PredicateNode> inputs = new ArrayList<PredicateNode>();
423 Object userObject;
424 }
425
426 /**
427 * Root inference node with True predicate. Shall be initialized by subclass.
428 */
429 @SuppressWarnings("unchecked")
430 protected PredicatedInferenceNode<E, P, C, K, H, S> rootNode;
431
432 /**
433 * Constructs join network for multi-event handlers.
434 * @param handler Handler
435 * @param predicate Predicate
436 * @return Collection of input handlers with cardinality 1.
437 * @throws CloneNotSupportedException
438 */
439 @SuppressWarnings("unchecked")
440 private Collection<JoinInput> buildJoinNetwork(final EventHandlerWrapper<E, P, C, K, H, S> handler) {
441
442 Predicate<E,C>[] pa = new Predicate[] {handler.getPredicate()};
443 // Stupid, but Eclipse doesn't allow to create an array of JoinInput using new JoinInput[].
444 JoinInput[] ret = (JoinInput[]) Array.newInstance(JoinInput.class, handler.getCardinality());
445 PredicateNode[] pna = (PredicateNode[]) Array.newInstance(PredicateNode.class, handler.getCardinality());
446 // Initialization.
447 for (int i=0; i<pna.length; ++i) {
448 pna[i]=new PredicateNode(i);
449 pna[i].predicate = extractIndexPath(Collections.singleton(i), pa);
450 pna[i].position = i;
451
452 // Initialization
453 ret[i] = new JoinInput(i);
454 ret[i].predicate = pna[i].predicate;
455 pna[i].userObject = ret[i];
456 }
457
458 while (size(pna)>1 && pa[0]!=null) {
459 List<PredicateNode> collector = new ArrayList<PredicateNode>();
460 permutate(pna, 0, collector);
461 Collections.sort(collector, INDEX_SET_COMPARATOR);
462 for (PredicateNode candidate: collector) {
463 candidate.predicate = extractIndexPath(candidate.indexes, pa);
464 if (candidate.predicate!=null) {
465 candidate.position = candidate.inputs.get(0).position;
466 for (PredicateNode input: candidate.inputs) {
467 pna[input.position]=null;
468 }
469 pna[candidate.position]=candidate;
470 break;
471 }
472 }
473 }
474
475 if (size(pna)>1) {
476 // Create final predicate-less predicate node
477 Set<Integer> allIndexes = new TreeSet<Integer>();
478 for (int i=0; i<handler.getCardinality(); ++i) {
479 allIndexes.add(i);
480 }
481 PredicateNode finalPredicateNode = new PredicateNode(allIndexes);
482 for (int i=0; i<pna.length; ++i) {
483 if (pna[i]!=null) {
484 finalPredicateNode.inputs.add(pna[i]);
485 pna[i]=null;
486 }
487 }
488 pna[0] = finalPredicateNode;
489 }
490
491 if (pa[0]!=null && !(pa[0] instanceof True)) {
492 // Index-less predicate, add to final predicate node.
493 if (pna[0].predicate == null) {
494 pna[0].predicate = pa[0];
495 } else if (pna[0].predicate instanceof And) {
496 pna[0].predicate = ((And<E,C>) pna[0].predicate).add(pa[0]);
497 } else if (pna[0].predicate instanceof CommutativeAnd) {
498 pna[0].predicate = ((CommutativeAnd<E,C>) pna[0].predicate).add(pa[0]);
499 } else {
500 pna[0].predicate = new And<E,C>(0, null, pna[0].predicate, pa[0]);
501 }
502 }
503
504 for (PredicateNode pn: pna) {
505 if (pn!=null) { // There should be only one such node.
506 wire(pn, handler, handler.getCardinality()).finalEventHandler=handler;
507 break;
508 }
509 }
510
511 return Arrays.asList(ret);
512 }
513
514 /**
515 * Extracts path for specified indexes from the predicate, replaces predicate if extraction was successful.
516 * @param idx Index.
517 * @param pa Single element array with source predicate. If extraction is successful then source predicate gets replaced with
518 * predicate without index parts.
519 * @param indexPath Index path to accumulate result.
520 * @return predicate for the specified index or null if extraction wasn't successful.
521 */
522 private Predicate<E,C> extractIndexPath(Set<Integer> indexes, Predicate<E, C>[] pa) {
523
524 if (pa[0] instanceof And) {
525 List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>();
526 List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((And<E,C>) pa[0]).getParts());
527 double currentCost = 0;
528 int ahead = 0;
529 ListIterator<Predicate<E, C>> pit = parts.listIterator();
530 Z: while (pit.hasNext()) {
531 Predicate<E, C> part = pit.next();
532 if (ahead==0) {
533 currentCost=part.getCost();
534 }
535 ++ahead;
536
537 Set<Integer> parameterIndices = part.parameterIndices();
538 if (indexes.containsAll(parameterIndices)) {
539 // Extract only if all lower cost predicates before were extracted.
540 if (currentCost<part.getCost()) {
541 continue;
542 }
543 // Exact matching index predicate - extract
544 indexPath.add(part);
545 pit.remove();
546 --ahead;
547 } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) {
548 // Multi-index predicate with matching index. Try to extract. Break if not possible.
549 @SuppressWarnings("unchecked")
550 Predicate<E, C>[] spa = new Predicate[] {part};
551 Predicate<E, C> subPart = extractIndexPath(indexes, spa);
552 if (subPart==null) {
553 break;
554 } else {
555 indexPath.add(subPart);
556 if (spa[0]==null) {
557 pit.remove();
558 --ahead;
559 } else {
560 pit.set(spa[0]);
561 currentCost=spa[0].getCost();
562 }
563 for (Integer idx: indexes) {
564 if (spa[0].parameterIndices().contains(idx)) {
565 // Still has the index - break.
566 break Z;
567 }
568 }
569 }
570 }
571 }
572
573 if (indexPath.isEmpty()) {
574 return null;
575 }
576
577 switch (parts.size()) {
578 case 0:
579 pa[0] = null;
580 break;
581 case 1:
582 pa[0] = parts.get(0);
583 break;
584 default:
585 pa[0] = new And<E,C>(0, null, parts);
586 break;
587 }
588
589 return indexPath.size()==1 ? indexPath.get(0) : new And<E,C>(0, null, indexPath);
590 } else if (pa[0] instanceof CommutativeAnd) {
591 List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>();
592 List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((CommutativeAnd<E,C>) pa[0]).getParts());
593 double currentCost = 0;
594 int ahead=0;
595 ListIterator<Predicate<E, C>> pit = parts.listIterator();
596 while (pit.hasNext()) {
597 Predicate<E, C> part = pit.next();
598 if (ahead==0) {
599 currentCost = part.getCost();
600 }
601 ++ahead;
602
603 Set<Integer> parameterIndices = part.parameterIndices();
604 if (indexes.containsAll(parameterIndices)) {
605 // Extract only if all lower cost predicates before were extracted.
606 if (currentCost<part.getCost()) {
607 continue;
608 }
609 // Single matching index predicate - extract
610 indexPath.add(part);
611 pit.remove();
612 --ahead;
613 } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) {
614 // Multi-index predicate with matching index. Try to extract.
615 @SuppressWarnings("unchecked")
616 Predicate<E, C>[] spa = new Predicate[] {part};
617 Predicate<E, C> subPart = extractIndexPath(indexes, spa);
618 if (subPart!=null) {
619 indexPath.add(subPart);
620 if (spa[0]==null) {
621 pit.remove();
622 --ahead;
623 } else {
624 pit.set(spa[0]);
625 currentCost = spa[0].getCost();
626 }
627 }
628 }
629 }
630
631 if (indexPath.isEmpty()) {
632 return null;
633 }
634
635 switch (parts.size()) {
636 case 0:
637 pa[0] = null;
638 break;
639 case 1:
640 pa[0] = parts.get(0);
641 break;
642 default:
643 pa[0] = new CommutativeAnd<E,C>(0, null, parts);
644 break;
645 }
646
647 return indexPath.size()==1 ? indexPath.get(0) : new CommutativeAnd<E,C>(0, null, indexPath);
648 }
649
650 if (indexes.containsAll(pa[0].parameterIndices())) {
651 Predicate<E, C> ret = pa[0];
652 pa[0] = null;
653 return ret;
654 }
655
656 return null;
657 }
658
659 /**
660 * Creates joiner which outputs to given event handler.
661 * @param jn
662 */
663 @SuppressWarnings("unchecked")
664 private JoinNode wire(PredicateNode pn, EventHandlerWrapper<E, P, C, K, H, S> handler, int finalCardinality) {
665 JoinNode ret = new JoinNode(finalCardinality, pn);
666
667 // Output mappers
668 ret.outputMappers = new Mapper[pn.inputs.size()];
669 int[][] jind = new int[ret.outputMappers.length][];
670 List<Integer> outputIndices = new ArrayList<Integer>(ret.outputIndices);
671 Collections.sort(outputIndices);
672 int idx = 0;
673 for (PredicateNode ipn: pn.inputs) {
674 if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
675 JoinInput ji = (JoinInput) ipn.userObject;
676 int jii = outputIndices.indexOf(ji.index);
677 if (jii==-1) {
678 throw new IllegalStateException("Join input index "+ji.index+" is not present in output indices "+outputIndices);
679 }
680 jind[idx] = new int[] {ji.index};
681 ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {jii});
682 } else {
683 List<Integer> ioi = new ArrayList<Integer>(ipn.indexes);
684 Collections.sort(ioi);
685 int[] map = new int[ioi.size()];
686 Iterator<Integer> ioiit = ioi.iterator();
687 for (int i=0; ioiit.hasNext(); ++i) {
688 int oi = ioiit.next();
689 int jii = outputIndices.indexOf(oi);
690 if (jii==-1) {
691 throw new IllegalStateException("Input node index "+oi+" at offset "+i+" is not present in output indices "+outputIndices);
692 }
693 map[i]=jii;
694 }
695 jind[idx] = new int[ioi.size()];
696 for (int i=0; i<jind.length; ++i) {
697 jind[idx][i] = ioi.get(i);
698 }
699 ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map);
700 }
701 }
702
703 MappingEventHandler<E, P, C, K, H, S> meh = new MappingEventHandler<E, P, C, K, H, S>(handler, ret.outputMappers);
704 ret.joiner = createJoiner(meh, jind);
705
706 // Predicate mappers.
707 ret.predicateMappers = new Mapper[pn.inputs.size()];
708 idx = 0;
709 for (PredicateNode ipn: pn.inputs) {
710 if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
711 JoinInput ji = (JoinInput) ipn.userObject;
712 ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {ji.index});
713 } else {
714 List<Integer> ioi = new ArrayList<Integer>(ipn.indexes);
715 Collections.sort(ioi);
716 int [] map = new int[ioi.size()];
717 Iterator<Integer> ioiit = ioi.iterator();
718 for (int i=0; ioiit.hasNext(); ++i) {
719 map[i]=ioiit.next();
720 }
721 ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map);
722 }
723 }
724
725 if (ret.predicate instanceof And || ret.predicate instanceof CommutativeAnd) {
726 for (Predicate<E, C> p: ((CompositePredicate<E,?,C,?>) ret.predicate).getParts()) {
727 ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(p, ret.predicateMappers, bus.getEventType()));
728 }
729 } else if (ret.predicate!=null) {
730 ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(ret.predicate, ret.predicateMappers, bus.getEventType()));
731 }
732
733 for (int i=0, l=pn.inputs.size(); i<l; ++i) {
734 PredicateNode ipn = pn.inputs.get(i);
735 if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) {
736 JoinInput joinInput = (JoinInput) ipn.userObject;
737 joinInput.joinNode = ret;
738 Predicate<E, C> jp;
739 if (joinInput.predicate==null) {
740 jp = True.getInstance();
741 } else {
742 jp = MappedPredicate.mapPredicate(joinInput.predicate, joinInput.map);
743 }
744
745 joinInput.handler = new JoinInputEventHandler(ret, i, handler, jp, 1);
746 } else {
747 wire(ipn, new JoinInputEventHandler(ret, i, handler, null, ipn.inputs.size()), finalCardinality).nxt=ret;
748 }
749 }
750 return ret;
751 }
752
753 @Override
754 public void removeHandlers(Iterable<K> keys) {
755 getLock().writeLock().lock();
756 try {
757 rootNode.remove(null, null, true, keys);
758 } finally {
759 getLock().writeLock().unlock();
760 }
761 }
762
763 private void permutate(PredicateNode[] pna, int offset, List<PredicateNode> collector) {
764 if (offset==pna.length) {
765 Set<Integer> set = new TreeSet<Integer>();
766 int joinCount = 0;
767 for (int i=0; i<pna.length; ++i) {
768 if (pna[i]!=null) {
769 set.addAll(pna[i].indexes);
770 ++joinCount;
771 }
772 }
773 if (set.size()>1 && joinCount>1) {
774 PredicateNode pn = new PredicateNode(set);
775 for (PredicateNode input: pna) {
776 if (input!=null) {
777 pn.inputs.add(input);
778 }
779 }
780 collector.add(pn);
781 }
782 } else {
783 PredicateNode pn = pna[offset];
784 permutate(pna, offset+1, collector);
785 if (pn!=null) {
786 pna[offset] = null;
787 permutate(pna, offset+1, collector);
788 pna[offset] = pn;
789 }
790 }
791 }
792
793 private final Comparator<PredicateNode> INDEX_SET_COMPARATOR = new Comparator<PredicateNode>() {
794
795 @Override
796 public int compare(PredicateNode p0, PredicateNode p1) {
797 double cost0 = p0.predicate==null ? 0 : p0.predicate.getCost();
798 double cost1 = p1.predicate==null ? 0 : p1.predicate.getCost();
799 double costDelta = cost0 - cost1;
800 if (costDelta<0) {
801 return -1;
802 }
803 if (costDelta>0) {
804 return 1;
805 }
806 int sizeDelta = p0.indexes.size() - p1.indexes.size();
807 if (sizeDelta!=0) {
808 return sizeDelta;
809 }
810
811 for (int i: p0.indexes) {
812 for (int j: p1.indexes) {
813 if (i!=j) {
814 return i-j;
815 }
816 }
817 }
818
819 return p0.hashCode() - p1.hashCode();
820 }
821
822 };
823 private EventBus<E, P, C, K, H, S> bus;
824
825 private int size(PredicateNode[] pna) {
826 int ret=0;
827 for (PredicateNode pn: pna) {
828 if (pn!=null) {
829 ++ret;
830 }
831 }
832 return ret;
833 }
834
835 public void reset() {
836 getLock().writeLock().lock();
837 try {
838 rootNode.reset();
839 } finally {
840 getLock().writeLock().unlock();
841 }
842 }
843
844 @Override
845 public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) {
846 getLock().writeLock().lock();
847 try {
848 handlerManager.manageHandlers(this);
849 } finally {
850 getLock().writeLock().unlock();
851 }
852 }
853
854 protected abstract PredicatedInferenceNode<E, P, C, K, H, S> createPredicatedInferenceNode(PredicatedInferenceNode<E, P, C, K, H, S> parent, Predicate<E, C> predicate, C context);
855
856 protected abstract Set<K> extractHandlerIds(EventHandlerWrapper<E, P, C, K, H, S> handler);
857
858 /**
859 * Generates handler ID.
860 * @return
861 */
862 protected abstract K nextId();
863
864 /**
865 * Factory method for handler wrapper.
866 * @param eventHandler
867 * @param oneOff
868 * @return
869 */
870 protected abstract EventHandlerWrapper<E, P, C, K, H, S> wrap(EventHandler<E, P, C, H, S> eventHandler, K registrationKey);
871
872 protected abstract class EventBusJoiner extends Joiner<Handle<E,P,C,K>[], CompositeContext<E, P, C, K, H, S>, Object> {
873
874
875 protected EventBusJoiner(
876 Collector<Handle<E,P,C,K>[]>[] inputCollectors,
877 Class<Handle<E,P,C,K>[]> inputType,
878 boolean outerJoin) {
879
880 super(inputCollectors, inputType, outerJoin);
881 }
882
883 protected abstract void takeSnapshot(K joinNodeId, Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken);
884
885 }
886
887 protected abstract EventBusJoiner createJoiner(JoinEventHandler<E, P, C, K, H, S> handler, int[][] indices);
888
889 @Override
890 public void setEventBus(EventBus<E, P, C, K, H, S> bus) {
891 this.bus = bus;
892 }
893
894 public EventBus<E, P, C, K, H, S> getEventBus() {
895 return bus;
896 }
897 }