1 | package com.hammurapi.eventbus; |
2 | |
3 | import java.lang.ref.Reference; |
4 | import java.lang.ref.WeakReference; |
5 | import java.util.Arrays; |
6 | import java.util.Collection; |
7 | import java.util.Collections; |
8 | import java.util.Date; |
9 | import java.util.HashSet; |
10 | import java.util.List; |
11 | import java.util.Map; |
12 | import java.util.Queue; |
13 | import java.util.Set; |
14 | import java.util.concurrent.ExecutorService; |
15 | import java.util.concurrent.TimeUnit; |
16 | import java.util.concurrent.atomic.AtomicLong; |
17 | import java.util.concurrent.locks.Lock; |
18 | import java.util.logging.Level; |
19 | import java.util.logging.Logger; |
20 | |
21 | import com.hammurapi.common.ExceptionHandler; |
22 | import com.hammurapi.common.Util; |
23 | import com.hammurapi.common.concurrent.TrackingExecutorService; |
24 | import com.hammurapi.eventbus.Matcher.HandlerManager; |
25 | import com.hammurapi.eventbus.monitoring.EventBusStats; |
26 | import com.hammurapi.eventbus.monitoring.Stats; |
27 | import com.hammurapi.eventbus.monitoring.StatsCollector; |
28 | import com.hammurapi.extract.ComparisonResult; |
29 | import com.hammurapi.extract.Extractor; |
30 | import com.hammurapi.extract.Predicate; |
31 | import com.hammurapi.store.Store; |
32 | |
33 | /** |
34 | * Event bus dispatches events to event handlers. |
35 | * @author Pavel Vlasov. |
36 | * @param <E> Event type. |
37 | * @param <P> Priority type. |
38 | * @param <C> Context type. |
39 | * @param <K> Handler registration key. |
40 | */ |
41 | public abstract class AbstractEventBus<E, P extends Comparable<P>, C, K, H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E,P,C,H,S>> implements EventBus<E, P, C, K, H, S> { |
42 | protected static final Logger logger = Logger.getLogger(AbstractEventBus.class.getName()); |
43 | |
44 | /** |
45 | * Interface to output bus structure for troubleshooting. |
46 | * @author Pavel Vlasov |
47 | * @param <K> Registration key type. |
48 | */ |
49 | public interface Snapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> { |
50 | |
51 | /** |
52 | * Invoked before any other methods. |
53 | */ |
54 | void start(); |
55 | |
56 | /** |
57 | * Invoked at the end of taking snapshot |
58 | * @param success True if there've been no exceptions. |
59 | */ |
60 | void end(boolean success); |
61 | |
62 | void handler(K id, EventHandler<E, P, C, H, S> eventHandler); |
63 | |
64 | void predicateNode(K id, Predicate<E, C> predicate, Collection<K> trueChildren, Collection<K> trueHandlers, Collection<K> falseChildren, Collection<K> falseHandlers, boolean isRoot); |
65 | |
66 | void joinInput(K id, K joinNodeId, int index); |
67 | |
68 | /** |
69 | * |
70 | * @param id Join node ID. |
71 | * @param predicates Node predicates. |
72 | * @param outputIndices Node output indices. |
73 | * @param eventHandlerId Handler ID if this node is a final join node. |
74 | * @param nextJoinNodeId Next join node id for intermediary nodes. |
75 | */ |
76 | void joinNode( |
77 | K id, |
78 | Predicate<E, C> predicate, |
79 | Set<Integer> outputIndices, |
80 | K eventHandlerId, |
81 | K nextJoinNodeId); |
82 | |
83 | } |
84 | |
85 | /** |
86 | * This snapshot also outputs bus state. |
87 | * @author Pavel Vlasov |
88 | * |
89 | * @param <E> |
90 | * @param <P> |
91 | * @param <C> |
92 | * @param <K> |
93 | * @param <H> |
94 | * @param <S> |
95 | */ |
96 | public interface StateSnapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> extends Snapshot<E, P, C, K, H, S> { |
97 | |
98 | void event(K id, E event, boolean directPost); |
99 | |
100 | void derivation(K eventId, K eventHandlerId, List<K> inputs); |
101 | |
102 | void joinInputCollector(K joinNodeId, int[] indices, Collection<K[]> elements); |
103 | |
104 | } |
105 | |
106 | |
107 | private Class<E> eventType; |
108 | |
109 | private InferencePolicy inferencePolicy; |
110 | |
111 | private TimeUnit statsTimeUnit; |
112 | |
113 | private StatsCollector statsCollector; |
114 | |
115 | private class EventBusStatsImpl implements EventBusStats { |
116 | |
117 | AtomicLong internalPosts = new AtomicLong(); |
118 | |
119 | AtomicLong posts = new AtomicLong(); |
120 | |
121 | AtomicLong postsBaseline = new AtomicLong(); |
122 | AtomicLong conclusionsBaseline = new AtomicLong(); |
123 | |
124 | private Date start = new Date(); |
125 | |
126 | @Override |
127 | public String getName() { |
128 | return "Event Bus"; |
129 | } |
130 | |
131 | @Override |
132 | public void reset() { |
133 | internalPosts.set(0); |
134 | posts.set(0); |
135 | postsBaseline.set(0); |
136 | conclusionsBaseline.set(0); |
137 | start = new Date(); |
138 | } |
139 | |
140 | @Override |
141 | public Iterable<Stats> children() { |
142 | return Collections.emptyList(); |
143 | } |
144 | |
145 | @Override |
146 | public long getPosts() { |
147 | return posts.get(); |
148 | } |
149 | |
150 | @Override |
151 | public long getPostsDelta() { |
152 | long currentPosts = posts.get(); |
153 | return currentPosts - postsBaseline.getAndSet(currentPosts); |
154 | } |
155 | |
156 | @Override |
157 | public long getConclusions() { |
158 | return internalPosts.get(); |
159 | } |
160 | |
161 | @Override |
162 | public long getConclusionsDelta() { |
163 | long currentConclusions = internalPosts.get(); |
164 | return currentConclusions - conclusionsBaseline.getAndSet(currentConclusions); |
165 | } |
166 | |
167 | AtomicLong handlersFired = new AtomicLong(); |
168 | AtomicLong handlersFiredBaseline = new AtomicLong(); |
169 | |
170 | @Override |
171 | public long getHandlersFired() { |
172 | return handlersFired.get(); |
173 | } |
174 | |
175 | @Override |
176 | public long getHandlersFiredDelta() { |
177 | long currentHandlersFired = handlersFired.get(); |
178 | return currentHandlersFired - handlersFiredBaseline.getAndSet(currentHandlersFired); |
179 | } |
180 | |
181 | @Override |
182 | public Date getStart() { |
183 | return start; |
184 | } |
185 | |
186 | } |
187 | |
188 | private EventBusStatsImpl stats; |
189 | |
190 | private InferenceFilter<E, P, C, K, H, S> inferenceFilter; |
191 | |
192 | private Matcher<E, P, C, K, H, S> matcher; |
193 | |
194 | public AbstractEventBus( |
195 | Class<E> eventType, |
196 | S store, |
197 | InferencePolicy inferencePolicy, |
198 | InferenceFilter<E,P,C,K,H,S> inferenceFilter, |
199 | StatsCollector statsCollector, |
200 | TimeUnit statsTimeUnit, |
201 | Matcher<E,P,C,K,H,S> matcher) { |
202 | this.store = store; |
203 | this.eventType = eventType; |
204 | this.inferencePolicy = inferencePolicy; |
205 | this.statsCollector = statsCollector; |
206 | if (statsCollector!=null) { |
207 | stats = new EventBusStatsImpl(); |
208 | statsCollector.add(stats); |
209 | } |
210 | this.statsTimeUnit = statsTimeUnit; |
211 | this.inferenceFilter = inferenceFilter; |
212 | this.matcher = matcher; |
213 | this.matcher.setEventBus(this); |
214 | } |
215 | |
216 | @Override |
217 | public Class<E> getEventType() { |
218 | return eventType; |
219 | } |
220 | |
221 | public InferencePolicy getInferencePolicy() { |
222 | return inferencePolicy; |
223 | } |
224 | |
225 | /** Abstract methods **/ |
226 | |
227 | |
228 | /** |
229 | * Instantiates master handle. |
230 | * @param event |
231 | * @return |
232 | */ |
233 | protected abstract H newMasterHandle(PostCommand<E,P,C,K,H,S> postCommand); |
234 | |
235 | protected class CreateMasterHandleResult { |
236 | |
237 | private boolean isNew; |
238 | private H handle; |
239 | |
240 | public boolean isNew() { |
241 | return isNew; |
242 | } |
243 | public H getHandle() { |
244 | return handle; |
245 | } |
246 | |
247 | CreateMasterHandleResult(H handle, boolean isNew) { |
248 | super(); |
249 | this.handle = handle; |
250 | this.isNew = isNew; |
251 | } |
252 | |
253 | |
254 | } |
255 | |
256 | |
257 | /** |
258 | * Returns "root" executor service to submit predicate evaluation and handler execution tasks to. |
259 | */ |
260 | protected abstract TrackingExecutorService getExecutorService(); |
261 | |
262 | /** |
263 | * Creates a wrapper around the master executor service for task tracking purposes. |
264 | * @param master Master executor service |
265 | * @return |
266 | */ |
267 | protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name); |
268 | |
269 | /** |
270 | * Create a wrapper around the root executor service for task tracking purposes. |
271 | * @return executor service or null for the synchronous mode. |
272 | */ |
273 | protected abstract TrackingExecutorService createExecutorService(boolean oneOff, String name); |
274 | |
275 | /** |
276 | * Generates handler ID. |
277 | * @return |
278 | */ |
279 | protected abstract K nextId(); |
280 | |
281 | /** |
282 | * Helper interface for snapshot taking. |
283 | * @author Pavel Vlasov |
284 | * |
285 | * @param <K> |
286 | */ |
287 | public interface DerivationEx<K> { |
288 | K getHandlerId(); |
289 | List<K> getInputIds(); |
290 | } |
291 | |
292 | @SuppressWarnings("unchecked") |
293 | protected void workingMemorySnapshot(StateSnapshot<E, P, C, K, H, S> snapshot) { |
294 | getStore().readLock().lock(); |
295 | try { |
296 | for (H handle: getStore().getAll()) { |
297 | snapshot.event(handle.getId(), handle.getEvent(), handle.isDirectPost()); |
298 | for (Derivation<E,P,C> d: handle.getDerivations()) { |
299 | snapshot.derivation(handle.getId(), ((DerivationEx<K>) d).getHandlerId(), ((DerivationEx<K>) d).getInputIds()); |
300 | } |
301 | } |
302 | } finally { |
303 | getStore().readLock().unlock(); |
304 | } |
305 | } |
306 | |
307 | // --- End abstract methods --- |
308 | |
309 | |
310 | public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) { |
311 | try { |
312 | snapshot.start(); |
313 | |
314 | // Step 1 Working memory |
315 | if (snapshot instanceof StateSnapshot) { |
316 | workingMemorySnapshot((StateSnapshot<E, P, C, K, H, S>) snapshot); |
317 | } |
318 | |
319 | matcher.takeSnapshot(snapshot); |
320 | |
321 | snapshot.end(true); |
322 | } catch (Exception e) { |
323 | snapshot.end(false); |
324 | throw new DispatchNetworkException("Error taking snapshot: "+e, e); |
325 | } |
326 | } |
327 | |
328 | private ExceptionHandler exceptionHandler; |
329 | |
330 | /* (non-Javadoc) |
331 | * @see com.hammurapi.eventbus.EventBus#setExceptionHandler(com.hammurapi.eventbus.ExceptionHandler) |
332 | */ |
333 | public void setExceptionHandler(ExceptionHandler exceptionHandler) { |
334 | this.exceptionHandler = exceptionHandler; |
335 | } |
336 | |
337 | /* (non-Javadoc) |
338 | * @see com.hammurapi.eventbus.EventBus#getExceptionHandler() |
339 | */ |
340 | public ExceptionHandler getExceptionHandler() { |
341 | return exceptionHandler; |
342 | } |
343 | |
344 | // Constructor which takes executor service and collection factory for joiner. |
345 | // Borrow joiner from |
346 | |
347 | /* (non-Javadoc) |
348 | * @see com.hammurapi.eventbus.EventBus#add(com.hammurapi.eventbus.EventHandler, C, boolean, com.hammurapi.extract.Predicate) |
349 | */ |
350 | @Override |
351 | public K addHandler(final EventHandler<E, P, C, H, S> eventHandler) { |
352 | return matcher.addHandler(eventHandler); |
353 | } |
354 | |
355 | // private JoinNode createJoinNode(PredicateNode pn, |
356 | // EventHandlerWrapper<E, P, C, K, H> handler) { |
357 | // // TODO Auto-generated method stub |
358 | // return null; |
359 | // } |
360 | |
361 | /* (non-Javadoc) |
362 | * @see com.hammurapi.eventbus.EventBus#remove(K) |
363 | */ |
364 | public void removeHandlers(Iterable<K> keys) { |
365 | matcher.removeHandlers(keys); |
366 | } |
367 | |
368 | @Override |
369 | public void removeHandlers(K... keys) { |
370 | removeHandlers(Arrays.asList(keys)); |
371 | } |
372 | |
373 | protected abstract Lock getRtcLock(); |
374 | |
375 | /* (non-Javadoc) |
376 | * @see com.hammurapi.eventbus.EventBus#post(E) |
377 | */ |
378 | @Override |
379 | public H post(final E event, Predicate<E, S>... validators) { |
380 | final boolean fineIsLoggable = logger.isLoggable(Level.FINE); |
381 | if (fineIsLoggable) { |
382 | StringBuilder sb = new StringBuilder(); |
383 | sb.append("Client post, event: "+event); |
384 | if (validators.length>0) { |
385 | sb.append(", validators: "+Arrays.toString(validators)); |
386 | } |
387 | logger.fine(sb.toString()); |
388 | } |
389 | |
390 | if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) { |
391 | getRtcLock().lock(); |
392 | } |
393 | try { |
394 | boolean exclusiveOrAfterRoot = InferencePolicy.AFTER_ROOT_EVENT.compareTo(inferencePolicy)<=0; |
395 | PostCommand<E,P,C,K,H,S> postCommand = new PostCommand<E,P,C,K,H,S>(event, true, null, null, null, createInferenceContext(), validators); |
396 | H ret = processInferenceCommand(postCommand); |
397 | if (exclusiveOrAfterRoot) { |
398 | TrackingExecutorService hes = postCommand.getInferenceContext().getExecutorService(); |
399 | if (hes!=null ) { |
400 | hes.join(); |
401 | } |
402 | Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = postCommand.getInferenceContext().getInferenceCommandsQueue(); |
403 | while (!inferenceCommandsQueue.isEmpty()) { |
404 | for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) { |
405 | processInferenceCommand(conclusion); |
406 | } |
407 | if (hes!=null) { |
408 | hes.join(); |
409 | } |
410 | } |
411 | } |
412 | return ret; |
413 | } catch (Exception e) { |
414 | logger.log(Level.SEVERE, "Post error: "+e, e); |
415 | if (getExceptionHandler()!=null) { |
416 | getExceptionHandler().handleException(e); |
417 | } |
418 | return null; |
419 | // masterHandle.handleException(e); |
420 | } finally { |
421 | if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) { |
422 | getRtcLock().unlock(); |
423 | } |
424 | } |
425 | } |
426 | |
427 | /** |
428 | * @param command command to be processed |
429 | * @param executorService Executor service for asynchronous processing |
430 | * @param inferenceCommandsQueue collector of inference commands for further processing. |
431 | */ |
432 | @SuppressWarnings("unchecked") |
433 | protected H processInferenceCommand(InferenceCommand<E, P, C, K, H, S> command) { |
434 | // Filter commands. |
435 | if (inferenceFilter!=null && !inferenceFilter.accept(command, this)) { |
436 | return null; |
437 | } |
438 | if (command instanceof PostCommand) { |
439 | PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) command; |
440 | if (stats!=null) { |
441 | if (postCommand.isDirectPost()) { |
442 | stats.posts.incrementAndGet(); |
443 | } else { |
444 | stats.internalPosts.incrementAndGet(); |
445 | } |
446 | } |
447 | |
448 | try { |
449 | Handle<E,P,C,K> handle = postCommand.isHandleMode() ? postCommand.getHandle() : createMasterHandle(postCommand); |
450 | if (postCommand.isDirectPost()) { |
451 | postCommand.getInferenceContext().setRootHandle(handle); |
452 | } |
453 | Iterable<EventHandlerWrapper<E, P, C, K, H, S>> handlers = matcher.match(handle.getEvent(), postCommand.getInferenceContext().getExecutorService()); |
454 | |
455 | Set<P> consumingPriorities = new HashSet<P>(); |
456 | for (EventHandlerWrapper<E, P, C, K, H, S> ehw: handlers) { |
457 | if (ehw.consumes()) { |
458 | consumingPriorities.add(ehw.getPriority()); |
459 | } |
460 | } |
461 | P prevPriority = null; |
462 | final boolean fineIsLoggable = logger.isLoggable(Level.FINE); |
463 | InferenceContext<E, P, C, K, H, S> inferenceContext = postCommand.getInferenceContext().createNext(); |
464 | TrackingExecutorService es = inferenceContext.getExecutorService(); |
465 | for (EventHandlerWrapper<E, P, C, K, H, S> handler: handlers) { |
466 | // Wait for higher priority handlers to finish. |
467 | if (es!=null && prevPriority!=null && consumingPriorities.contains(prevPriority) && !handler.getPriority().equals(prevPriority)) { |
468 | es.join(); |
469 | if (!handle.isValid()) { |
470 | if (fineIsLoggable) { |
471 | logger.fine("Event was consumed: "+handle.getEvent()); |
472 | } |
473 | |
474 | break; // Event has been consumed. |
475 | } |
476 | } |
477 | |
478 | if (fineIsLoggable) { |
479 | logger.fine("Posting handler to execution: "+handler+" to process event "+handle.getEvent()); |
480 | } |
481 | |
482 | if (stats!=null) { |
483 | stats.handlersFired.incrementAndGet(); |
484 | } |
485 | if (es==null) { |
486 | handler.post(null, inferenceContext, handle); |
487 | } else { |
488 | es.execute(new HandlerTask<E,P,C,K,H,S>(handle, handler, inferenceContext)); |
489 | } |
490 | prevPriority = handler.getPriority(); |
491 | } |
492 | |
493 | if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) { |
494 | if (es!=null) { |
495 | es.join(); |
496 | } |
497 | Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue(); |
498 | for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) { |
499 | processInferenceCommand(conclusion); |
500 | } |
501 | } |
502 | return (H) handle; |
503 | } catch (InterruptedException e) { |
504 | throw new EventDispatchException(e); |
505 | } |
506 | } |
507 | |
508 | if (command instanceof RemoveCommand) { |
509 | processRemoveCommand((RemoveCommand<E,P,C,K,H,S>) command); |
510 | return null; |
511 | } |
512 | |
513 | if (command instanceof RetractCommand) { |
514 | final boolean fineIsLoggable = logger.isLoggable(Level.FINE); |
515 | |
516 | if (fineIsLoggable) { |
517 | logger.fine("Posting retract command to execution: "+command); |
518 | } |
519 | |
520 | if (stats!=null) { |
521 | stats.handlersFired.incrementAndGet(); |
522 | } |
523 | |
524 | InferenceContext<E, P, C, K, H, S> inferenceContext = command.getInferenceContext().createNext(); |
525 | TrackingExecutorService es = inferenceContext.getExecutorService(); |
526 | |
527 | if (es==null) { |
528 | RetractTask.processCommand((RetractCommand<E,P,C,K,H,S>) command, inferenceContext); |
529 | } else { |
530 | es.execute(new RetractTask<E,P,C,K,H,S>((RetractCommand<E,P,C,K,H,S>) command, inferenceContext)); |
531 | } |
532 | |
533 | if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) { |
534 | if (es!=null) { |
535 | try { |
536 | es.join(); |
537 | } catch (InterruptedException e) { |
538 | throw new EventDispatchException(e); |
539 | } |
540 | } |
541 | Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue(); |
542 | for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) { |
543 | processInferenceCommand(conclusion); |
544 | } |
545 | } |
546 | |
547 | return null; |
548 | } |
549 | |
550 | throw new IllegalArgumentException("Unexpected command: "+command); |
551 | } |
552 | |
553 | // A trick to avoid exposure of local handle methods. |
554 | protected abstract void processRemoveCommand(RemoveCommand<E,P,C,K,H,S> command); |
555 | |
556 | /** |
557 | * Creates and returns new master handle. |
558 | * re-dispatch existing event. |
559 | * @param event Event to add to the store. |
560 | * @param joinDelegate |
561 | * @param directPost |
562 | * @param validators |
563 | * @param derivation |
564 | * @return Master handle for new event, null for existing. |
565 | */ |
566 | protected H createMasterHandle(PostCommand<E,P,C,K,H,S> postCommand) { |
567 | getStore().writeLock().lock(); |
568 | try { |
569 | H handle = null; |
570 | if (getStore().getPrimaryKeyExtractor()==null) { |
571 | for (H eh: getStore()) { |
572 | if (eh.isValid() && postCommand.getEvent().equals(eh.getEvent())) { |
573 | handle = eh; |
574 | break; |
575 | } |
576 | } |
577 | } else { |
578 | handle = getStore().getByPrimaryKey(postCommand.getEvent()); |
579 | } |
580 | |
581 | if (handle!=null && !postCommand.isDirectPost()) { |
582 | handle.addDerivation(postCommand.getHandlerId(), postCommand.getHandler(), postCommand.getInputs()); |
583 | return handle; |
584 | } |
585 | |
586 | handle = newMasterHandle(postCommand); |
587 | Predicate<H, S>[] va = new Predicate[postCommand.getValidators().length]; |
588 | final Reference<E> eventReference = new WeakReference<E>(postCommand.getEvent()); |
589 | for (int i=0; i<postCommand.getValidators().length; ++i) { |
590 | final Predicate<E, S> validator = postCommand.getValidators()[i]; |
591 | |
592 | va[i] = new Predicate<H, S>() { |
593 | |
594 | @Override |
595 | public Boolean extract( |
596 | S context, |
597 | Map<S, Map<Extractor<H, ? super Boolean, S>, ? super Boolean>> cache, |
598 | H... obj) { |
599 | E event = eventReference.get(); |
600 | if (event==null) { |
601 | return false; |
602 | } |
603 | return validator.extract(context, null, Util.wrap(event)); |
604 | } |
605 | |
606 | @Override |
607 | public Set<Integer> parameterIndices() { |
608 | return validator.parameterIndices(); |
609 | } |
610 | |
611 | @Override |
612 | public boolean isContextDependent() { |
613 | return validator.isContextDependent(); |
614 | } |
615 | |
616 | @Override |
617 | public ComparisonResult compareTo(Extractor<H, Boolean, S> other) { |
618 | return ComparisonResult.NOT_EQUAL_NM; // Doesn't matter. |
619 | } |
620 | |
621 | @Override |
622 | public double getCost() { |
623 | return validator.getCost(); |
624 | } |
625 | |
626 | }; |
627 | } |
628 | Store.Handle<H, E, S> sh = getStore().put(handle, va); |
629 | handle.setStoreHandle(sh); |
630 | return handle; |
631 | } finally { |
632 | getStore().writeLock().unlock(); |
633 | } |
634 | } |
635 | |
636 | /* (non-Javadoc) |
637 | * @see com.hammurapi.eventbus.EventBus#reset() |
638 | */ |
639 | public void reset() { |
640 | getStore().clear(); |
641 | matcher.reset(); |
642 | } |
643 | |
644 | // public void rebuildDispatchNetwork() { |
645 | // getBusLock().writeLock().lock(); |
646 | // try { |
647 | // rootNode.rebuild(); |
648 | // } finally { |
649 | // getBusLock().writeLock().unlock(); |
650 | // } |
651 | // } |
652 | |
653 | /** |
654 | * Store entry complements store handle to provide functionality of event handle. |
655 | * @author Pavel Vlasov |
656 | * |
657 | */ |
658 | public interface StoreEntry<E,P extends Comparable<P>,C,K> { |
659 | |
660 | /** |
661 | * @return Derivations of this event. |
662 | */ |
663 | Collection<Derivation<E,P,C>> getDerivations(); |
664 | |
665 | boolean isDerivedFrom(E event); |
666 | |
667 | /** |
668 | * Sets new event. Invoked by substitute(). |
669 | * For internal use, shall not be invoked by client code. |
670 | * @param event |
671 | */ |
672 | void setEvent(E event); |
673 | |
674 | /** |
675 | * Invalidates this handle, creates a new handle |
676 | * for updated event, posts updated event to the bus, |
677 | * For internal use, shall not be invoked by client code. |
678 | * @return New handle for updated event. |
679 | */ |
680 | Handle<E,P,C,K> update(); |
681 | |
682 | /** |
683 | * @return Event id. |
684 | */ |
685 | K getId(); |
686 | } |
687 | |
688 | /** |
689 | * Event handle. |
690 | * @author Pavel Vlasov |
691 | * |
692 | */ |
693 | public interface Handle<E,P extends Comparable<P>,C,K> extends EventBus.Handle<E, P, C>, ExceptionHandler { |
694 | |
695 | // /** |
696 | // * Sets new event. Invoked by substitute(). |
697 | // * For internal use, shall not be invoked by client code. |
698 | // * @param event |
699 | // */ |
700 | // void setEvent(E event); |
701 | |
702 | /** |
703 | * For internal use, shall not be invoked by client code. |
704 | * @return New handle for updated event. |
705 | */ |
706 | void update(); |
707 | |
708 | K getId(); |
709 | |
710 | /** |
711 | * Callback method. For internal use. |
712 | * @param storeHandle |
713 | */ |
714 | <H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E, P, C, H, S>> void setStoreHandle(Store.Handle<H, E, S> storeHandle); |
715 | |
716 | /** |
717 | * For internal use. |
718 | * @param derivation |
719 | */ |
720 | void addDerivation(K handlerId, EventHandler<E, P, C, ?, ?> handler, Handle<E,P,C,K>[] inputs); |
721 | |
722 | /** |
723 | * @return true if this handle's event was posted by client code. |
724 | */ |
725 | boolean isDirectPost(); |
726 | |
727 | /** |
728 | * Sets direct post to true. For internal use. |
729 | */ |
730 | void setDirectPost(); |
731 | } |
732 | |
733 | /* (non-Javadoc) |
734 | * @see com.hammurapi.eventbus.EventBus#getDerivations(E) |
735 | */ |
736 | public Collection<Derivation<E,P,C>> getDerivations(E event) { |
737 | H handle = null; |
738 | if (getStore().getPrimaryKeyExtractor()==null) { |
739 | for (H sh: getStore()) { |
740 | if (event.equals(sh.getEvent())) { |
741 | handle = sh; |
742 | break; |
743 | } |
744 | } |
745 | } else { |
746 | handle = getStore().getByPrimaryKey(event); |
747 | } |
748 | |
749 | if (handle==null) { |
750 | return Collections.emptyList(); |
751 | } |
752 | return handle.getDerivations(); |
753 | } |
754 | |
755 | @Override |
756 | public void join() throws InterruptedException { |
757 | if (getExecutorService()!=null) { |
758 | getExecutorService().join(); |
759 | } |
760 | } |
761 | |
762 | public boolean join(long timeout) throws InterruptedException { |
763 | return getExecutorService().join(timeout); |
764 | } |
765 | |
766 | private S store; |
767 | |
768 | @Override |
769 | public S getStore() { |
770 | return store; |
771 | } |
772 | |
773 | @Override |
774 | public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) { |
775 | matcher.manageHandlers(handlerManager); |
776 | } |
777 | |
778 | /** |
779 | * Creates inference context with zero chain length and no root handle. |
780 | * Inference queue is supplied with AFTER_EVENT, AFTER_ROOT_EVENT and EXCLUSIVE policies. |
781 | * Use this method to create initial/root inference context. |
782 | * @return |
783 | */ |
784 | protected abstract InferenceContext<E,P,C,K,H,S> createInferenceContext(); |
785 | |
786 | } |