1 | package com.hammurapi.eventbus; |
2 | |
3 | import java.lang.reflect.Array; |
4 | import java.util.ArrayList; |
5 | import java.util.Arrays; |
6 | import java.util.Collection; |
7 | import java.util.Collections; |
8 | import java.util.Comparator; |
9 | import java.util.HashMap; |
10 | import java.util.HashSet; |
11 | import java.util.Iterator; |
12 | import java.util.LinkedList; |
13 | import java.util.List; |
14 | import java.util.ListIterator; |
15 | import java.util.Map; |
16 | import java.util.Set; |
17 | import java.util.TreeSet; |
18 | import java.util.concurrent.ExecutionException; |
19 | import java.util.concurrent.ExecutorService; |
20 | import java.util.concurrent.Future; |
21 | import java.util.concurrent.atomic.AtomicReference; |
22 | import java.util.concurrent.locks.ReadWriteLock; |
23 | import java.util.logging.Level; |
24 | import java.util.logging.Logger; |
25 | |
26 | import com.hammurapi.common.FreezeableCollection; |
27 | import com.hammurapi.common.Joiner; |
28 | import com.hammurapi.common.concurrent.TrackingExecutorService; |
29 | import com.hammurapi.eventbus.AbstractEventBus.Handle; |
30 | import com.hammurapi.eventbus.AbstractEventBus.Snapshot; |
31 | import com.hammurapi.extract.And; |
32 | import com.hammurapi.extract.CommutativeAnd; |
33 | import com.hammurapi.extract.CommutativeOr; |
34 | import com.hammurapi.extract.CompositePredicate; |
35 | import com.hammurapi.extract.Extractor; |
36 | import com.hammurapi.extract.False; |
37 | import com.hammurapi.extract.MappedPredicate; |
38 | import com.hammurapi.extract.Predicate; |
39 | import com.hammurapi.extract.True; |
40 | |
41 | public abstract class PredicateChainingMatcher<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements Matcher<E, P, C, K, H, S> { |
42 | private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName()); |
43 | |
44 | /** |
45 | * Creates a wrapper around the master executor service for task tracking purposes. |
46 | * @param master Master executor service |
47 | * @return |
48 | */ |
49 | protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name); |
50 | |
51 | @Override |
52 | public Iterable<EventHandlerWrapper<E, P, C, K, H, S>> match(E event, ExecutorService executorService) { |
53 | final boolean fineIsLoggable = logger.isLoggable(Level.FINE); |
54 | if (fineIsLoggable) { |
55 | logger.fine("Collecting handlers to fire for event "+event); |
56 | } |
57 | |
58 | List<EventHandlerWrapper<E, P, C, K, H, S>> handlers = new ArrayList<EventHandlerWrapper<E,P,C,K,H,S>>(); |
59 | Map<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>> cache = new HashMap<C, Map<Extractor<E, ? super Boolean, C>, ? super Boolean>>(); |
60 | |
61 | getLock().readLock().lock(); |
62 | try { |
63 | TrackingExecutorService ces = createExecutorService(executorService, true, "Handler collector"); |
64 | if (ces == null) { |
65 | rootNode.collectHandlers(cache, handlers, event); |
66 | } else { |
67 | // Reference for robustness - unset when done. |
68 | FreezeableCollection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>> collector = new FreezeableCollection<Future<Collection<EventHandlerWrapper<E,P,C,K,H,S>>>>(Collections.synchronizedCollection(new ArrayList<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>())); |
69 | AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>> collectorRef = new AtomicReference<Collection<Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>>>>(Collections.synchronizedCollection(collector)); |
70 | rootNode.collectHandlers(cache, ces, collectorRef, event); |
71 | try { |
72 | ces.join(); |
73 | } catch (InterruptedException ie) { |
74 | throw new EventDispatchException(ie); |
75 | } |
76 | |
77 | collector.freeze(); |
78 | for (Future<Collection<EventHandlerWrapper<E, P, C, K, H, S>>> future: collectorRef.get()) { // To catch errors when handlers are added after join() |
79 | try { |
80 | handlers.addAll(future.get()); |
81 | } catch (ExecutionException e) { |
82 | throw new EventDispatchException("Problem collecting handlers to fire: "+e, e); |
83 | } catch (InterruptedException e) { |
84 | throw new EventDispatchException("Collecting handlers to fire has been interrupted: "+e, e); |
85 | } |
86 | } |
87 | } |
88 | } finally { |
89 | getLock().readLock().unlock(); |
90 | } |
91 | |
92 | // Sort handlers before execution. |
93 | Collections.sort(handlers, new Comparator<EventHandlerWrapper<E, P, C, K, H, S>>() { |
94 | |
95 | @Override |
96 | public int compare(EventHandlerWrapper<E, P, C, K, H, S> o1, EventHandlerWrapper<E, P, C, K, H, S> o2) { |
97 | P p1 = o1.getPriority(); |
98 | P p2 = o2.getPriority(); |
99 | if (p2==null) { |
100 | return p1==null ? o2.hashCode() - o1.hashCode() : 1; |
101 | } |
102 | if (p1==null) { |
103 | return -1; |
104 | } |
105 | return p2.compareTo(p1); |
106 | } |
107 | |
108 | }); |
109 | |
110 | if (fineIsLoggable) { |
111 | logger.fine("Collected "+handlers.size()+" handlers to fire: "+handlers); |
112 | } |
113 | |
114 | return handlers; |
115 | } |
116 | |
117 | @Override |
118 | public K addHandler(EventHandler<E, P, C, H, S> eventHandler) { |
119 | if (eventHandler.getCardinality()<1) { |
120 | throw new DispatchNetworkException("Handler cardinality cannot be less than one."); |
121 | } |
122 | getLock().writeLock().lock(); |
123 | try { |
124 | final K ret = nextId(); |
125 | addInternal(wrap(eventHandler, ret)); |
126 | return ret; |
127 | } finally { |
128 | getLock().writeLock().unlock(); |
129 | } |
130 | } |
131 | |
132 | /** |
133 | * Lock for the inference network. |
134 | * @return |
135 | */ |
136 | protected abstract ReadWriteLock getLock(); |
137 | |
138 | @Override |
139 | public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) { |
140 | getLock().writeLock().lock(); // Locking the bus for writing and as such for reading. |
141 | try { |
142 | // Step 2 Dispatch network & join collections. |
143 | rootNode.takeSnapshot(snapshot, new HashSet<K>()); |
144 | } finally { |
145 | getLock().writeLock().unlock(); // Done! |
146 | } |
147 | } |
148 | |
149 | private void addInternal(EventHandlerWrapper<E, P, C, K, H, S> wrappedHandler) { |
150 | if (wrappedHandler.getPredicate() instanceof CommutativeOr) { |
151 | for (Predicate<E, C> part: ((CommutativeOr<E, C>) wrappedHandler.getPredicate()).getParts()) { |
152 | addInternal(new EventHandlerWrapperFilter<E, P, C, K, H, S>(wrappedHandler, part)); |
153 | } |
154 | } else { |
155 | // Don't bother if always false |
156 | if (!False.getInstance().equals(wrappedHandler.getPredicate())) { |
157 | if (wrappedHandler.getCardinality()==1) { |
158 | if (!rootNode.addHandler(wrappedHandler, null)) { |
159 | throw new DispatchNetworkException("Predicate "+wrappedHandler.getPredicate()+" cannot be added to the root node, handler "+wrappedHandler.getHandler()); |
160 | } |
161 | } else { |
162 | for (JoinInput ji: buildJoinNetwork(wrappedHandler)) { |
163 | if (!rootNode.addHandler(ji.handler, null)) { |
164 | throw new DispatchNetworkException("Predicate "+ji.predicate+" cannot be added to the root node, for join handler"); |
165 | } |
166 | } |
167 | } |
168 | } |
169 | } |
170 | } |
171 | |
172 | protected class JoinInput { |
173 | |
174 | K id; |
175 | |
176 | public JoinInput(int index) { |
177 | this.index = index; |
178 | this.id = nextId(); |
179 | this.map = new int[] {index}; |
180 | } |
181 | |
182 | EventHandlerWrapper<E, P, C, K, H, S> handler; |
183 | |
184 | Predicate<E, C> predicate; |
185 | |
186 | JoinNode joinNode; |
187 | |
188 | final int index; |
189 | |
190 | final int[] map; |
191 | |
192 | } |
193 | |
194 | // private JoinNode createJoinNode(PredicateNode pn, |
195 | // EventHandlerWrapper<E, P, C, K, H> handler) { |
196 | // // TODO Auto-generated method stub |
197 | // return null; |
198 | // } |
199 | |
200 | protected class JoinInputEventHandler implements EventHandlerWrapper<E, P, C, K, H, S> { |
201 | |
202 | private EventBusJoiner joiner; |
203 | private int idx; |
204 | final K id = nextId(); |
205 | private EventHandlerWrapper<E, P, C, K, H, S> handler; |
206 | private JoinNode joinNode; |
207 | private Predicate<E, C> predicate; |
208 | private int cardinality; |
209 | |
210 | JoinInputEventHandler( |
211 | JoinNode joinNode, |
212 | int idx, |
213 | EventHandlerWrapper<E, P, C, K, H, S> handler, |
214 | Predicate<E, C> predicate, |
215 | int cardinality) { |
216 | |
217 | this.joiner = joinNode.joiner; |
218 | this.joinNode = joinNode; |
219 | this.idx = idx; |
220 | this.handler = handler; |
221 | this.joinNode = joinNode; |
222 | this.predicate = predicate; |
223 | this.cardinality = cardinality; |
224 | } |
225 | |
226 | public K getId() { |
227 | return id; |
228 | } |
229 | |
230 | @Override |
231 | public boolean consumes() { |
232 | return handler.consumes(); |
233 | } |
234 | |
235 | @Override |
236 | public int getCardinality() { |
237 | return cardinality; |
238 | } |
239 | |
240 | @Override |
241 | public C getContext() { |
242 | return handler.getContext(); |
243 | } |
244 | |
245 | @Override |
246 | public P getPriority() { |
247 | return handler.getPriority(); |
248 | } |
249 | |
250 | @Override |
251 | public void post( |
252 | EventDispatchContext<E, P, C, H, S> context, |
253 | InferenceContext<E,P,C,K,H,S> inferenceContext, |
254 | Handle<E,P,C,K>... handles) { |
255 | try { |
256 | |
257 | if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) { |
258 | // if (inferenceContext.getInferenceCommandsQueue()!=null) { |
259 | // throw new IllegalArgumentException("Inference commands queue shall be null"); |
260 | // } |
261 | inferenceContext = inferenceContext.wrap(); |
262 | } |
263 | |
264 | joiner.addInput(idx, handles, new CompositeContext<E, P, C, K, H, S>( |
265 | context, |
266 | handler.getContext(), |
267 | inferenceContext)); |
268 | |
269 | } catch (Exception e) { |
270 | logger.log(Level.SEVERE, "Join problem: "+e, e); |
271 | if (bus.getExceptionHandler()!=null) { |
272 | bus.getExceptionHandler().handleException(e); |
273 | } |
274 | if (inferenceContext.getRootHandle()!=null) { |
275 | inferenceContext.getRootHandle().handleException(e); |
276 | } |
277 | } finally { |
278 | if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) { |
279 | inferenceContext.processInferenceCommands(); |
280 | } |
281 | } |
282 | } |
283 | |
284 | @Override |
285 | public void reset() { |
286 | joiner.reset(); |
287 | handler.reset(); |
288 | } |
289 | |
290 | @Override |
291 | public EventHandler<E, P, C, H, S> getHandler() { |
292 | throw new UnsupportedOperationException(); |
293 | } |
294 | |
295 | @Override |
296 | public Set<K> getRegistrationKeys() { |
297 | return handler.getRegistrationKeys(); |
298 | } |
299 | |
300 | @Override |
301 | public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) { |
302 | if (taken.add(id)) { |
303 | snapshot.joinInput(id, joinNode.id, idx); |
304 | joinNode.takeSnapshot(snapshot, taken); |
305 | } |
306 | } |
307 | |
308 | @Override |
309 | public Predicate<E, C> getPredicate() { |
310 | return predicate; |
311 | } |
312 | |
313 | @Override |
314 | public String toString() { |
315 | return "JoinInputEventHandler [joiner=" + joiner + |
316 | ", idx=" + idx + |
317 | ", id=" + id + |
318 | ", handler=" + handler + |
319 | ", joinNode=" + joinNode + |
320 | ", predicate=" + predicate + "]"; |
321 | } |
322 | |
323 | @Override |
324 | public boolean isOneOff() { |
325 | return handler==null ? joinNode.isOneOff() : handler.isOneOff(); |
326 | } |
327 | |
328 | @Override |
329 | public Mode getMode() { |
330 | return handler==null ? joinNode.getMode() : handler.getMode(); |
331 | } |
332 | |
333 | } |
334 | |
335 | /** |
336 | * Joins several inputs, applies predicate(s) |
337 | * @author Pavel Vlasov |
338 | * |
339 | */ |
340 | protected class JoinNode { |
341 | final Set<Integer> outputIndices; |
342 | Predicate<E, C> predicate; |
343 | EventBusJoiner joiner; |
344 | EventHandlerWrapper<E, P, C, K, H, S> finalEventHandler; |
345 | int finalCardinality; |
346 | |
347 | public JoinNode(int finalCardinality, PredicateNode pn) { |
348 | if (pn.userObject!=null) { |
349 | throw new EventBusException("User object is not null!"); |
350 | } |
351 | this.outputIndices = pn.indexes; |
352 | this.predicate = pn.predicate; |
353 | pn.userObject = this; |
354 | this.finalCardinality = finalCardinality; |
355 | this.id = nextId(); |
356 | } |
357 | |
358 | JoinNode nxt; |
359 | // List<JoinNode> inputNodes = new ArrayList<JoinNode>(); |
360 | // |
361 | // List<JoinInput> joinInputs = new ArrayList<JoinInput>(); |
362 | |
363 | // Maps input indexes to output |
364 | Mapper<Handle<E,P,C,K>>[] outputMappers; |
365 | |
366 | // Maps input indexes to predicate |
367 | Mapper<Handle<E,P,C,K>>[] predicateMappers; |
368 | |
369 | K id; |
370 | |
371 | void setNext(JoinNode next) { |
372 | nxt = next; |
373 | // nxt.inputNodes.add(this); |
374 | } |
375 | |
376 | void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken) { |
377 | if (taken.add(id)) { |
378 | if (finalEventHandler!=null) { |
379 | for (K hid: finalEventHandler.getRegistrationKeys()) { |
380 | if (taken.add(hid)) { |
381 | snapshot.handler(hid, finalEventHandler.getHandler()); |
382 | } |
383 | } |
384 | } |
385 | K hid = finalEventHandler==null ? null : finalEventHandler.getRegistrationKeys().iterator().next(); |
386 | if (nxt!=null) { |
387 | nxt.takeSnapshot(snapshot, taken); |
388 | } |
389 | snapshot.joinNode(id, predicate, outputIndices, hid, nxt==null ? null : nxt.id); |
390 | joiner.takeSnapshot(id, snapshot, taken); |
391 | } |
392 | } |
393 | |
394 | public boolean isOneOff() { |
395 | return nxt==null ? finalEventHandler.isOneOff() : nxt.isOneOff(); |
396 | } |
397 | |
398 | public EventHandlerBase.Mode getMode() { |
399 | return nxt==null ? finalEventHandler.getMode() : nxt.getMode(); |
400 | } |
401 | } |
402 | |
403 | /** |
404 | * Helper class for building join network. |
405 | * @author Pavel Vlasov |
406 | * |
407 | */ |
408 | private class PredicateNode { |
409 | |
410 | int position; |
411 | |
412 | PredicateNode(Set<Integer> indexes) { |
413 | this.indexes = indexes; |
414 | } |
415 | |
416 | PredicateNode(int index) { |
417 | indexes = Collections.singleton(index); |
418 | } |
419 | |
420 | Set<Integer> indexes; |
421 | Predicate<E,C> predicate; |
422 | List<PredicateNode> inputs = new ArrayList<PredicateNode>(); |
423 | Object userObject; |
424 | } |
425 | |
426 | /** |
427 | * Root inference node with True predicate. Shall be initialized by subclass. |
428 | */ |
429 | @SuppressWarnings("unchecked") |
430 | protected PredicatedInferenceNode<E, P, C, K, H, S> rootNode; |
431 | |
432 | /** |
433 | * Constructs join network for multi-event handlers. |
434 | * @param handler Handler |
435 | * @param predicate Predicate |
436 | * @return Collection of input handlers with cardinality 1. |
437 | * @throws CloneNotSupportedException |
438 | */ |
439 | @SuppressWarnings("unchecked") |
440 | private Collection<JoinInput> buildJoinNetwork(final EventHandlerWrapper<E, P, C, K, H, S> handler) { |
441 | |
442 | Predicate<E,C>[] pa = new Predicate[] {handler.getPredicate()}; |
443 | // Stupid, but Eclipse doesn't allow to create an array of JoinInput using new JoinInput[]. |
444 | JoinInput[] ret = (JoinInput[]) Array.newInstance(JoinInput.class, handler.getCardinality()); |
445 | PredicateNode[] pna = (PredicateNode[]) Array.newInstance(PredicateNode.class, handler.getCardinality()); |
446 | // Initialization. |
447 | for (int i=0; i<pna.length; ++i) { |
448 | pna[i]=new PredicateNode(i); |
449 | pna[i].predicate = extractIndexPath(Collections.singleton(i), pa); |
450 | pna[i].position = i; |
451 | |
452 | // Initialization |
453 | ret[i] = new JoinInput(i); |
454 | ret[i].predicate = pna[i].predicate; |
455 | pna[i].userObject = ret[i]; |
456 | } |
457 | |
458 | while (size(pna)>1 && pa[0]!=null) { |
459 | List<PredicateNode> collector = new ArrayList<PredicateNode>(); |
460 | permutate(pna, 0, collector); |
461 | Collections.sort(collector, INDEX_SET_COMPARATOR); |
462 | for (PredicateNode candidate: collector) { |
463 | candidate.predicate = extractIndexPath(candidate.indexes, pa); |
464 | if (candidate.predicate!=null) { |
465 | candidate.position = candidate.inputs.get(0).position; |
466 | for (PredicateNode input: candidate.inputs) { |
467 | pna[input.position]=null; |
468 | } |
469 | pna[candidate.position]=candidate; |
470 | break; |
471 | } |
472 | } |
473 | } |
474 | |
475 | if (size(pna)>1) { |
476 | // Create final predicate-less predicate node |
477 | Set<Integer> allIndexes = new TreeSet<Integer>(); |
478 | for (int i=0; i<handler.getCardinality(); ++i) { |
479 | allIndexes.add(i); |
480 | } |
481 | PredicateNode finalPredicateNode = new PredicateNode(allIndexes); |
482 | for (int i=0; i<pna.length; ++i) { |
483 | if (pna[i]!=null) { |
484 | finalPredicateNode.inputs.add(pna[i]); |
485 | pna[i]=null; |
486 | } |
487 | } |
488 | pna[0] = finalPredicateNode; |
489 | } |
490 | |
491 | if (pa[0]!=null && !(pa[0] instanceof True)) { |
492 | // Index-less predicate, add to final predicate node. |
493 | if (pna[0].predicate == null) { |
494 | pna[0].predicate = pa[0]; |
495 | } else if (pna[0].predicate instanceof And) { |
496 | pna[0].predicate = ((And<E,C>) pna[0].predicate).add(pa[0]); |
497 | } else if (pna[0].predicate instanceof CommutativeAnd) { |
498 | pna[0].predicate = ((CommutativeAnd<E,C>) pna[0].predicate).add(pa[0]); |
499 | } else { |
500 | pna[0].predicate = new And<E,C>(0, null, pna[0].predicate, pa[0]); |
501 | } |
502 | } |
503 | |
504 | for (PredicateNode pn: pna) { |
505 | if (pn!=null) { // There should be only one such node. |
506 | wire(pn, handler, handler.getCardinality()).finalEventHandler=handler; |
507 | break; |
508 | } |
509 | } |
510 | |
511 | return Arrays.asList(ret); |
512 | } |
513 | |
514 | /** |
515 | * Extracts path for specified indexes from the predicate, replaces predicate if extraction was successful. |
516 | * @param idx Index. |
517 | * @param pa Single element array with source predicate. If extraction is successful then source predicate gets replaced with |
518 | * predicate without index parts. |
519 | * @param indexPath Index path to accumulate result. |
520 | * @return predicate for the specified index or null if extraction wasn't successful. |
521 | */ |
522 | private Predicate<E,C> extractIndexPath(Set<Integer> indexes, Predicate<E, C>[] pa) { |
523 | |
524 | if (pa[0] instanceof And) { |
525 | List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>(); |
526 | List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((And<E,C>) pa[0]).getParts()); |
527 | double currentCost = 0; |
528 | int ahead = 0; |
529 | ListIterator<Predicate<E, C>> pit = parts.listIterator(); |
530 | Z: while (pit.hasNext()) { |
531 | Predicate<E, C> part = pit.next(); |
532 | if (ahead==0) { |
533 | currentCost=part.getCost(); |
534 | } |
535 | ++ahead; |
536 | |
537 | Set<Integer> parameterIndices = part.parameterIndices(); |
538 | if (indexes.containsAll(parameterIndices)) { |
539 | // Extract only if all lower cost predicates before were extracted. |
540 | if (currentCost<part.getCost()) { |
541 | continue; |
542 | } |
543 | // Exact matching index predicate - extract |
544 | indexPath.add(part); |
545 | pit.remove(); |
546 | --ahead; |
547 | } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) { |
548 | // Multi-index predicate with matching index. Try to extract. Break if not possible. |
549 | @SuppressWarnings("unchecked") |
550 | Predicate<E, C>[] spa = new Predicate[] {part}; |
551 | Predicate<E, C> subPart = extractIndexPath(indexes, spa); |
552 | if (subPart==null) { |
553 | break; |
554 | } else { |
555 | indexPath.add(subPart); |
556 | if (spa[0]==null) { |
557 | pit.remove(); |
558 | --ahead; |
559 | } else { |
560 | pit.set(spa[0]); |
561 | currentCost=spa[0].getCost(); |
562 | } |
563 | for (Integer idx: indexes) { |
564 | if (spa[0].parameterIndices().contains(idx)) { |
565 | // Still has the index - break. |
566 | break Z; |
567 | } |
568 | } |
569 | } |
570 | } |
571 | } |
572 | |
573 | if (indexPath.isEmpty()) { |
574 | return null; |
575 | } |
576 | |
577 | switch (parts.size()) { |
578 | case 0: |
579 | pa[0] = null; |
580 | break; |
581 | case 1: |
582 | pa[0] = parts.get(0); |
583 | break; |
584 | default: |
585 | pa[0] = new And<E,C>(0, null, parts); |
586 | break; |
587 | } |
588 | |
589 | return indexPath.size()==1 ? indexPath.get(0) : new And<E,C>(0, null, indexPath); |
590 | } else if (pa[0] instanceof CommutativeAnd) { |
591 | List<Predicate<E,C>> indexPath = new LinkedList<Predicate<E,C>>(); |
592 | List<Predicate<E,C>> parts = new LinkedList<Predicate<E,C>>(((CommutativeAnd<E,C>) pa[0]).getParts()); |
593 | double currentCost = 0; |
594 | int ahead=0; |
595 | ListIterator<Predicate<E, C>> pit = parts.listIterator(); |
596 | while (pit.hasNext()) { |
597 | Predicate<E, C> part = pit.next(); |
598 | if (ahead==0) { |
599 | currentCost = part.getCost(); |
600 | } |
601 | ++ahead; |
602 | |
603 | Set<Integer> parameterIndices = part.parameterIndices(); |
604 | if (indexes.containsAll(parameterIndices)) { |
605 | // Extract only if all lower cost predicates before were extracted. |
606 | if (currentCost<part.getCost()) { |
607 | continue; |
608 | } |
609 | // Single matching index predicate - extract |
610 | indexPath.add(part); |
611 | pit.remove(); |
612 | --ahead; |
613 | } else if (parameterIndices.size()>indexes.size() && parameterIndices.containsAll(indexes)) { |
614 | // Multi-index predicate with matching index. Try to extract. |
615 | @SuppressWarnings("unchecked") |
616 | Predicate<E, C>[] spa = new Predicate[] {part}; |
617 | Predicate<E, C> subPart = extractIndexPath(indexes, spa); |
618 | if (subPart!=null) { |
619 | indexPath.add(subPart); |
620 | if (spa[0]==null) { |
621 | pit.remove(); |
622 | --ahead; |
623 | } else { |
624 | pit.set(spa[0]); |
625 | currentCost = spa[0].getCost(); |
626 | } |
627 | } |
628 | } |
629 | } |
630 | |
631 | if (indexPath.isEmpty()) { |
632 | return null; |
633 | } |
634 | |
635 | switch (parts.size()) { |
636 | case 0: |
637 | pa[0] = null; |
638 | break; |
639 | case 1: |
640 | pa[0] = parts.get(0); |
641 | break; |
642 | default: |
643 | pa[0] = new CommutativeAnd<E,C>(0, null, parts); |
644 | break; |
645 | } |
646 | |
647 | return indexPath.size()==1 ? indexPath.get(0) : new CommutativeAnd<E,C>(0, null, indexPath); |
648 | } |
649 | |
650 | if (indexes.containsAll(pa[0].parameterIndices())) { |
651 | Predicate<E, C> ret = pa[0]; |
652 | pa[0] = null; |
653 | return ret; |
654 | } |
655 | |
656 | return null; |
657 | } |
658 | |
659 | /** |
660 | * Creates joiner which outputs to given event handler. |
661 | * @param jn |
662 | */ |
663 | @SuppressWarnings("unchecked") |
664 | private JoinNode wire(PredicateNode pn, EventHandlerWrapper<E, P, C, K, H, S> handler, int finalCardinality) { |
665 | JoinNode ret = new JoinNode(finalCardinality, pn); |
666 | |
667 | // Output mappers |
668 | ret.outputMappers = new Mapper[pn.inputs.size()]; |
669 | int[][] jind = new int[ret.outputMappers.length][]; |
670 | List<Integer> outputIndices = new ArrayList<Integer>(ret.outputIndices); |
671 | Collections.sort(outputIndices); |
672 | int idx = 0; |
673 | for (PredicateNode ipn: pn.inputs) { |
674 | if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) { |
675 | JoinInput ji = (JoinInput) ipn.userObject; |
676 | int jii = outputIndices.indexOf(ji.index); |
677 | if (jii==-1) { |
678 | throw new IllegalStateException("Join input index "+ji.index+" is not present in output indices "+outputIndices); |
679 | } |
680 | jind[idx] = new int[] {ji.index}; |
681 | ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {jii}); |
682 | } else { |
683 | List<Integer> ioi = new ArrayList<Integer>(ipn.indexes); |
684 | Collections.sort(ioi); |
685 | int[] map = new int[ioi.size()]; |
686 | Iterator<Integer> ioiit = ioi.iterator(); |
687 | for (int i=0; ioiit.hasNext(); ++i) { |
688 | int oi = ioiit.next(); |
689 | int jii = outputIndices.indexOf(oi); |
690 | if (jii==-1) { |
691 | throw new IllegalStateException("Input node index "+oi+" at offset "+i+" is not present in output indices "+outputIndices); |
692 | } |
693 | map[i]=jii; |
694 | } |
695 | jind[idx] = new int[ioi.size()]; |
696 | for (int i=0; i<jind.length; ++i) { |
697 | jind[idx][i] = ioi.get(i); |
698 | } |
699 | ret.outputMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map); |
700 | } |
701 | } |
702 | |
703 | MappingEventHandler<E, P, C, K, H, S> meh = new MappingEventHandler<E, P, C, K, H, S>(handler, ret.outputMappers); |
704 | ret.joiner = createJoiner(meh, jind); |
705 | |
706 | // Predicate mappers. |
707 | ret.predicateMappers = new Mapper[pn.inputs.size()]; |
708 | idx = 0; |
709 | for (PredicateNode ipn: pn.inputs) { |
710 | if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) { |
711 | JoinInput ji = (JoinInput) ipn.userObject; |
712 | ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(new int[] {ji.index}); |
713 | } else { |
714 | List<Integer> ioi = new ArrayList<Integer>(ipn.indexes); |
715 | Collections.sort(ioi); |
716 | int [] map = new int[ioi.size()]; |
717 | Iterator<Integer> ioiit = ioi.iterator(); |
718 | for (int i=0; ioiit.hasNext(); ++i) { |
719 | map[i]=ioiit.next(); |
720 | } |
721 | ret.predicateMappers[idx++] = new Mapper<Handle<E,P,C,K>>(map); |
722 | } |
723 | } |
724 | |
725 | if (ret.predicate instanceof And || ret.predicate instanceof CommutativeAnd) { |
726 | for (Predicate<E, C> p: ((CompositePredicate<E,?,C,?>) ret.predicate).getParts()) { |
727 | ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(p, ret.predicateMappers, bus.getEventType())); |
728 | } |
729 | } else if (ret.predicate!=null) { |
730 | ret.joiner.addPredicate(new MappedHandlePredicate<E, P, C, K, H, S>(ret.predicate, ret.predicateMappers, bus.getEventType())); |
731 | } |
732 | |
733 | for (int i=0, l=pn.inputs.size(); i<l; ++i) { |
734 | PredicateNode ipn = pn.inputs.get(i); |
735 | if (ipn.userObject instanceof PredicateChainingMatcher.JoinInput) { |
736 | JoinInput joinInput = (JoinInput) ipn.userObject; |
737 | joinInput.joinNode = ret; |
738 | Predicate<E, C> jp; |
739 | if (joinInput.predicate==null) { |
740 | jp = True.getInstance(); |
741 | } else { |
742 | jp = MappedPredicate.mapPredicate(joinInput.predicate, joinInput.map); |
743 | } |
744 | |
745 | joinInput.handler = new JoinInputEventHandler(ret, i, handler, jp, 1); |
746 | } else { |
747 | wire(ipn, new JoinInputEventHandler(ret, i, handler, null, ipn.inputs.size()), finalCardinality).nxt=ret; |
748 | } |
749 | } |
750 | return ret; |
751 | } |
752 | |
753 | @Override |
754 | public void removeHandlers(Iterable<K> keys) { |
755 | getLock().writeLock().lock(); |
756 | try { |
757 | rootNode.remove(null, null, true, keys); |
758 | } finally { |
759 | getLock().writeLock().unlock(); |
760 | } |
761 | } |
762 | |
763 | private void permutate(PredicateNode[] pna, int offset, List<PredicateNode> collector) { |
764 | if (offset==pna.length) { |
765 | Set<Integer> set = new TreeSet<Integer>(); |
766 | int joinCount = 0; |
767 | for (int i=0; i<pna.length; ++i) { |
768 | if (pna[i]!=null) { |
769 | set.addAll(pna[i].indexes); |
770 | ++joinCount; |
771 | } |
772 | } |
773 | if (set.size()>1 && joinCount>1) { |
774 | PredicateNode pn = new PredicateNode(set); |
775 | for (PredicateNode input: pna) { |
776 | if (input!=null) { |
777 | pn.inputs.add(input); |
778 | } |
779 | } |
780 | collector.add(pn); |
781 | } |
782 | } else { |
783 | PredicateNode pn = pna[offset]; |
784 | permutate(pna, offset+1, collector); |
785 | if (pn!=null) { |
786 | pna[offset] = null; |
787 | permutate(pna, offset+1, collector); |
788 | pna[offset] = pn; |
789 | } |
790 | } |
791 | } |
792 | |
793 | private final Comparator<PredicateNode> INDEX_SET_COMPARATOR = new Comparator<PredicateNode>() { |
794 | |
795 | @Override |
796 | public int compare(PredicateNode p0, PredicateNode p1) { |
797 | double cost0 = p0.predicate==null ? 0 : p0.predicate.getCost(); |
798 | double cost1 = p1.predicate==null ? 0 : p1.predicate.getCost(); |
799 | double costDelta = cost0 - cost1; |
800 | if (costDelta<0) { |
801 | return -1; |
802 | } |
803 | if (costDelta>0) { |
804 | return 1; |
805 | } |
806 | int sizeDelta = p0.indexes.size() - p1.indexes.size(); |
807 | if (sizeDelta!=0) { |
808 | return sizeDelta; |
809 | } |
810 | |
811 | for (int i: p0.indexes) { |
812 | for (int j: p1.indexes) { |
813 | if (i!=j) { |
814 | return i-j; |
815 | } |
816 | } |
817 | } |
818 | |
819 | return p0.hashCode() - p1.hashCode(); |
820 | } |
821 | |
822 | }; |
823 | private EventBus<E, P, C, K, H, S> bus; |
824 | |
825 | private int size(PredicateNode[] pna) { |
826 | int ret=0; |
827 | for (PredicateNode pn: pna) { |
828 | if (pn!=null) { |
829 | ++ret; |
830 | } |
831 | } |
832 | return ret; |
833 | } |
834 | |
835 | public void reset() { |
836 | getLock().writeLock().lock(); |
837 | try { |
838 | rootNode.reset(); |
839 | } finally { |
840 | getLock().writeLock().unlock(); |
841 | } |
842 | } |
843 | |
844 | @Override |
845 | public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) { |
846 | getLock().writeLock().lock(); |
847 | try { |
848 | handlerManager.manageHandlers(this); |
849 | } finally { |
850 | getLock().writeLock().unlock(); |
851 | } |
852 | } |
853 | |
854 | protected abstract PredicatedInferenceNode<E, P, C, K, H, S> createPredicatedInferenceNode(PredicatedInferenceNode<E, P, C, K, H, S> parent, Predicate<E, C> predicate, C context); |
855 | |
856 | protected abstract Set<K> extractHandlerIds(EventHandlerWrapper<E, P, C, K, H, S> handler); |
857 | |
858 | /** |
859 | * Generates handler ID. |
860 | * @return |
861 | */ |
862 | protected abstract K nextId(); |
863 | |
864 | /** |
865 | * Factory method for handler wrapper. |
866 | * @param eventHandler |
867 | * @param oneOff |
868 | * @return |
869 | */ |
870 | protected abstract EventHandlerWrapper<E, P, C, K, H, S> wrap(EventHandler<E, P, C, H, S> eventHandler, K registrationKey); |
871 | |
872 | protected abstract class EventBusJoiner extends Joiner<Handle<E,P,C,K>[], CompositeContext<E, P, C, K, H, S>, Object> { |
873 | |
874 | |
875 | protected EventBusJoiner( |
876 | Collector<Handle<E,P,C,K>[]>[] inputCollectors, |
877 | Class<Handle<E,P,C,K>[]> inputType, |
878 | boolean outerJoin) { |
879 | |
880 | super(inputCollectors, inputType, outerJoin); |
881 | } |
882 | |
883 | protected abstract void takeSnapshot(K joinNodeId, Snapshot<E, P, C, K, H, S> snapshot, Set<K> taken); |
884 | |
885 | } |
886 | |
887 | protected abstract EventBusJoiner createJoiner(JoinEventHandler<E, P, C, K, H, S> handler, int[][] indices); |
888 | |
889 | @Override |
890 | public void setEventBus(EventBus<E, P, C, K, H, S> bus) { |
891 | this.bus = bus; |
892 | } |
893 | |
894 | public EventBus<E, P, C, K, H, S> getEventBus() { |
895 | return bus; |
896 | } |
897 | } |