1 | package com.hammurapi.eventbus.local; |
2 | |
3 | import java.util.Iterator; |
4 | import java.util.Queue; |
5 | import java.util.concurrent.ConcurrentLinkedQueue; |
6 | import java.util.concurrent.ExecutorService; |
7 | import java.util.concurrent.TimeUnit; |
8 | import java.util.concurrent.atomic.AtomicLong; |
9 | import java.util.concurrent.locks.Lock; |
10 | import java.util.concurrent.locks.ReentrantLock; |
11 | import java.util.logging.Logger; |
12 | |
13 | import com.hammurapi.common.ExceptionHandler; |
14 | import com.hammurapi.common.ObservableConverter; |
15 | import com.hammurapi.common.concurrent.LocalTrackingExecutorService; |
16 | import com.hammurapi.common.concurrent.TrackingExecutorService; |
17 | import com.hammurapi.eventbus.AbstractEventBus; |
18 | import com.hammurapi.eventbus.CompositeInferenceFilter; |
19 | import com.hammurapi.eventbus.EventBus; |
20 | import com.hammurapi.eventbus.EventStore; |
21 | import com.hammurapi.eventbus.InferenceCommand; |
22 | import com.hammurapi.eventbus.InferenceContext; |
23 | import com.hammurapi.eventbus.InferenceFilter; |
24 | import com.hammurapi.eventbus.InferencePolicy; |
25 | import com.hammurapi.eventbus.Matcher; |
26 | import com.hammurapi.eventbus.PostCommand; |
27 | import com.hammurapi.eventbus.RemoveCommand; |
28 | import com.hammurapi.eventbus.monitoring.StatsCollector; |
29 | import com.hammurapi.store.local.LocalHandle.HandleStrength; |
30 | |
31 | /** |
32 | * Base class for event bus which functions within JVM boundaries. |
33 | * @author Pavel Vlasov |
34 | * |
35 | * @param <E> Event type. |
36 | * @param <P> Handler priority type. |
37 | * @param <C> Context type. |
38 | * @param <K> Registration key type. |
39 | */ |
40 | |
41 | public class LocalEventBusBase<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> |
42 | extends AbstractEventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> { |
43 | |
44 | |
45 | interface LocalHandle<E,P extends Comparable<P>,C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> extends AbstractEventBus.Handle<E, P, C, Long> { |
46 | void addRemoveListener(RemoveListener<E, P, C, S> removeListener); |
47 | void remove(boolean forUpdate, InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext); |
48 | } |
49 | |
50 | private static final Logger logger = Logger.getLogger(LocalEventBusBase.class.getName()); |
51 | private LocalTrackingExecutorService executorService; |
52 | HandleStrength collectorHandleStrength; |
53 | private int maxDerivationDepth = 100; |
54 | S unmodifiableStore; |
55 | ObservableConverter<E> observableConverter; |
56 | |
57 | public int getMaxDerivationDepth() { |
58 | return maxDerivationDepth; |
59 | } |
60 | |
61 | public void setMaxDerivationDepth(int maxDerivationDepth) { |
62 | this.maxDerivationDepth = maxDerivationDepth; |
63 | } |
64 | |
65 | /** |
66 | * Bus configurator. |
67 | * @author Pavel Vlasov |
68 | * |
69 | */ |
70 | public static class Config<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> { |
71 | private Class<E> eventType; |
72 | private S store; |
73 | private InferencePolicy inferencePolicy = InferencePolicy.IMMEDIATELY; |
74 | private ExecutorService executorService; |
75 | private HandleStrength collectorHandleStrength = HandleStrength.STRONG; |
76 | private ObservableConverter<E> observableConverter; |
77 | private boolean assertPredicatesBeforePost; |
78 | private StatsCollector statsCollector; |
79 | private TimeUnit statsTimeUnit; |
80 | private InferenceFilter<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>, S>[] inferenceFilters; |
81 | private LocalMatcher<E, P, C, S> matcher = new LocalPredicateChainingMatcher<E, P, C, S>(); |
82 | |
83 | public void setStatsCollector(StatsCollector statsCollector) { |
84 | this.statsCollector = statsCollector; |
85 | } |
86 | |
87 | public StatsCollector getStatsCollector() { |
88 | return statsCollector; |
89 | } |
90 | |
91 | public Class<E> getEventType() { |
92 | return eventType; |
93 | } |
94 | public void setEventType(Class<E> eventType) { |
95 | this.eventType = eventType; |
96 | } |
97 | public S getStore() { |
98 | return store; |
99 | } |
100 | public void setStore(S store) { |
101 | this.store = store; |
102 | } |
103 | public InferencePolicy getInferencePolicy() { |
104 | return inferencePolicy; |
105 | } |
106 | public void setInferencePolicy(InferencePolicy inferencePolicy) { |
107 | this.inferencePolicy = inferencePolicy; |
108 | } |
109 | public ExecutorService getExecutorService() { |
110 | return executorService; |
111 | } |
112 | public void setExecutorService(ExecutorService executorService) { |
113 | this.executorService = executorService; |
114 | } |
115 | public HandleStrength getCollectorHandleStrength() { |
116 | return collectorHandleStrength; |
117 | } |
118 | public void setCollectorHandleStrength(HandleStrength collectorHandleStrength) { |
119 | this.collectorHandleStrength = collectorHandleStrength; |
120 | } |
121 | public ObservableConverter<E> getObservableConverter() { |
122 | return observableConverter; |
123 | } |
124 | public void setObservableConverter(ObservableConverter<E> observableConverter) { |
125 | this.observableConverter = observableConverter; |
126 | } |
127 | public boolean isAssertPredicatesBeforePost() { |
128 | return assertPredicatesBeforePost; |
129 | } |
130 | public void setAssertPredicatesBeforePost(boolean assertPredicatesBeforePost) { |
131 | this.assertPredicatesBeforePost = assertPredicatesBeforePost; |
132 | } |
133 | public TimeUnit getStatsTimeUnit() { |
134 | return statsTimeUnit; |
135 | } |
136 | public void setStatsTimeUnit(TimeUnit statsTimeUnit) { |
137 | this.statsTimeUnit = statsTimeUnit; |
138 | } |
139 | public void setInferenceFilter(InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>... inferenceFilters) { |
140 | this.inferenceFilters = inferenceFilters; |
141 | } |
142 | public InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>[] getInferenceFilters() { |
143 | return inferenceFilters; |
144 | } |
145 | |
146 | public Matcher<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> getMatcher() { |
147 | return matcher; |
148 | } |
149 | |
150 | public void setMatcher(LocalMatcher<E, P, C, S> matcher) { |
151 | this.matcher = matcher; |
152 | } |
153 | |
154 | InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> getInferenceFilter() { |
155 | if (inferenceFilters==null || inferenceFilters.length==0) { |
156 | return null; |
157 | } |
158 | if (inferenceFilters.length==1) { |
159 | return inferenceFilters[0]; |
160 | } |
161 | return new CompositeInferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S>(inferenceFilters); |
162 | } |
163 | |
164 | } |
165 | |
166 | private boolean assertPredicatesBeforePost; |
167 | |
168 | boolean isAssertPredicatesBeforePost() { |
169 | return assertPredicatesBeforePost; |
170 | } |
171 | |
172 | HandleStrength getCollectorHandleStrength() { |
173 | return collectorHandleStrength; |
174 | } |
175 | |
176 | public LocalEventBusBase(Config<E,P,C,S> config) { |
177 | |
178 | super( |
179 | config.getEventType(), |
180 | config.getStore(), |
181 | config.getInferencePolicy(), |
182 | config.getInferenceFilter(), |
183 | config.getStatsCollector(), |
184 | config.getStatsTimeUnit(), |
185 | config.getMatcher()); |
186 | |
187 | unmodifiableStore = config.getStore().createUnmodifiableFacade(); |
188 | this.executorService = config.getExecutorService()==null ? null : new LocalTrackingExecutorService(config.getExecutorService(), false, "Bus executor"); |
189 | this.collectorHandleStrength = config.getCollectorHandleStrength(); |
190 | |
191 | this.observableConverter = config.getObservableConverter(); |
192 | this.assertPredicatesBeforePost = config.isAssertPredicatesBeforePost(); |
193 | } |
194 | |
195 | @Override |
196 | protected TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name) { |
197 | return master==null ? null : new LocalTrackingExecutorService(master, oneOff, name); |
198 | } |
199 | |
200 | @Override |
201 | protected TrackingExecutorService createExecutorService( boolean oneOff, String name) { |
202 | TrackingExecutorService es = getExecutorService(); |
203 | return es==null ? null : new LocalTrackingExecutorService(es, oneOff, name); |
204 | } |
205 | |
206 | @Override |
207 | protected TrackingExecutorService getExecutorService() { |
208 | return executorService; |
209 | } |
210 | |
211 | private AtomicLong eventCounter = new AtomicLong(); |
212 | |
213 | @Override |
214 | protected Long nextId() { |
215 | return eventCounter.incrementAndGet(); |
216 | } |
217 | |
218 | @Override |
219 | protected Handle<E, P, C, Long> newMasterHandle(PostCommand<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> postCommand) { |
220 | |
221 | MasterHandle<E,P,C, S> masterHandle = new MasterHandle<E,P,C, S>(this, postCommand.getEvent(), postCommand.getInferenceContext().getExecutorService(), nextId(), postCommand.isDirectPost(), postCommand.getValidators()); |
222 | if (!postCommand.isDirectPost()) { |
223 | masterHandle.addDerivation(postCommand.getHandlerId(), postCommand.getHandler(), postCommand.getInputs()); |
224 | } |
225 | return masterHandle; |
226 | } |
227 | |
228 | // /** |
229 | // * To make it visible to local classes. |
230 | // */ |
231 | // @Override |
232 | // protected void postInternal( |
233 | // Handle<E, P, C, Long> masterHandle, |
234 | // ExecutorService masterExecutorService, |
235 | // Queue<InferenceCommand<E, P, C, Handle<E, P, C, Long>>> conclusionQueue, |
236 | // ExceptionHandler rootHandle) { |
237 | // super.postInternal(masterHandle, masterExecutorService, conclusionQueue, rootHandle); |
238 | // } |
239 | |
240 | // /** |
241 | // * createMasterHandle() exposed to local dispatch context. |
242 | // */ |
243 | // protected Handle<E,P,C,Long> createMasterHandleExposed(E event, TrackingExecutorService joinDelegate, boolean directPost, Predicate<E,S>[] validators, Derivation<E,P,C> derivation) { |
244 | // CreateMasterHandleResult cmhr = createMasterHandle(event, joinDelegate, directPost, validators, derivation); |
245 | // return cmhr.isNew() ? cmhr.getHandle() : null; |
246 | // } |
247 | |
248 | @Override |
249 | public void remove(E event) { |
250 | if (getStore().getPrimaryKeyExtractor()==null) { |
251 | getStore().writeLock().lock(); |
252 | try { |
253 | for (Handle<E,P,C,Long> handle: getStore()) { |
254 | if (event.equals(handle.getEvent())) { |
255 | handle.remove(); |
256 | } |
257 | } |
258 | } finally { |
259 | getStore().writeLock().unlock(); |
260 | } |
261 | } else { |
262 | Handle<E, P, C, Long> h = getStore().getByPrimaryKey(event); |
263 | if (h!=null) { |
264 | h.remove(); |
265 | } |
266 | } |
267 | } |
268 | |
269 | private Lock rtcLock = new ReentrantLock(); |
270 | |
271 | @Override |
272 | protected Lock getRtcLock() { |
273 | return rtcLock; |
274 | } |
275 | |
276 | /** |
277 | * To make it visible to local classes. |
278 | */ |
279 | @Override |
280 | protected com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long> processInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) { |
281 | return super.processInferenceCommand(command); |
282 | } |
283 | |
284 | @Override |
285 | protected InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createInferenceContext() { |
286 | |
287 | boolean createQueue = InferencePolicy.AFTER_EVENT.compareTo(getInferencePolicy())<=0; |
288 | final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue = createQueue ? new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>() : null; |
289 | final TrackingExecutorService hes = createExecutorService(false, "Handle executor"); |
290 | |
291 | return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() { |
292 | |
293 | private ExceptionHandler rootHandle; |
294 | |
295 | @Override |
296 | public int getInferenceChainLength() { |
297 | return 0; |
298 | } |
299 | |
300 | @Override |
301 | public TrackingExecutorService getExecutorService() { |
302 | return hes; |
303 | } |
304 | |
305 | @Override |
306 | public ExceptionHandler getRootHandle() { |
307 | return rootHandle; |
308 | } |
309 | |
310 | @Override |
311 | public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() { |
312 | return LocalEventBusBase.this; |
313 | } |
314 | |
315 | @Override |
316 | public void setRootHandle(ExceptionHandler rootHandle) { |
317 | if (this.rootHandle!=null && this.rootHandle!=rootHandle) { |
318 | throw new IllegalStateException("Root handle already set"); |
319 | } |
320 | this.rootHandle = rootHandle; |
321 | } |
322 | |
323 | @Override |
324 | public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() { |
325 | return LocalEventBusBase.this.createInferenceContext(this); |
326 | } |
327 | |
328 | |
329 | @Override |
330 | public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() { |
331 | return LocalEventBusBase.this.wrapInferenceContext(this); |
332 | } |
333 | |
334 | @Override |
335 | public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) { |
336 | if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) { |
337 | LocalEventBusBase.this.processInferenceCommand(command); |
338 | } else { |
339 | inferenceCommandsQueue.add(command); |
340 | } |
341 | } |
342 | |
343 | @Override |
344 | public void processInferenceCommands() { |
345 | Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator(); |
346 | while (cit.hasNext()) { |
347 | LocalEventBusBase.this.processInferenceCommand(cit.next()); |
348 | cit.remove(); |
349 | } |
350 | } |
351 | |
352 | @Override |
353 | public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() { |
354 | return inferenceCommandsQueue; |
355 | } |
356 | |
357 | }; |
358 | } |
359 | |
360 | InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createInferenceContext(final InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> prev) { |
361 | final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue; |
362 | switch (getInferencePolicy()) { |
363 | case EXCLUSIVE: |
364 | case AFTER_ROOT_EVENT: |
365 | inferenceCommandsQueue = prev.getInferenceCommandsQueue(); // Inherited. |
366 | break; |
367 | case AFTER_EVENT: |
368 | inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>(); // New queue for after event. |
369 | break; |
370 | case AFTER_HANDLER: |
371 | case IMMEDIATELY: |
372 | inferenceCommandsQueue = null; // No queue for |
373 | break; |
374 | default: |
375 | throw new IllegalArgumentException("Unexpected inference policy: "+getInferencePolicy()); |
376 | } |
377 | |
378 | final TrackingExecutorService hes = createExecutorService(prev.getExecutorService(), false, "Handle executor"); |
379 | |
380 | return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() { |
381 | |
382 | @Override |
383 | public int getInferenceChainLength() { |
384 | return prev.getInferenceChainLength()+1; |
385 | } |
386 | |
387 | @Override |
388 | public TrackingExecutorService getExecutorService() { |
389 | return hes; |
390 | } |
391 | |
392 | @Override |
393 | public ExceptionHandler getRootHandle() { |
394 | return prev.getRootHandle(); |
395 | } |
396 | |
397 | @Override |
398 | public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() { |
399 | return LocalEventBusBase.this; |
400 | } |
401 | |
402 | @Override |
403 | public void setRootHandle(ExceptionHandler rootHandle) { |
404 | prev.setRootHandle(rootHandle); |
405 | } |
406 | |
407 | @Override |
408 | public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() { |
409 | return LocalEventBusBase.this.createInferenceContext(this); |
410 | } |
411 | |
412 | @Override |
413 | public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() { |
414 | return LocalEventBusBase.this.wrapInferenceContext(this); |
415 | } |
416 | |
417 | |
418 | @Override |
419 | public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) { |
420 | if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) { |
421 | LocalEventBusBase.this.processInferenceCommand(command); |
422 | } else { |
423 | inferenceCommandsQueue.add(command); |
424 | } |
425 | } |
426 | |
427 | @Override |
428 | public void processInferenceCommands() { |
429 | Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator(); |
430 | while (cit.hasNext()) { |
431 | LocalEventBusBase.this.processInferenceCommand(cit.next()); |
432 | cit.remove(); |
433 | } |
434 | } |
435 | |
436 | @Override |
437 | public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() { |
438 | return inferenceCommandsQueue; |
439 | } |
440 | |
441 | }; |
442 | } |
443 | |
444 | InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> wrapInferenceContext(final InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> master) { |
445 | if (!InferencePolicy.AFTER_HANDLER.equals(getInferencePolicy())) { |
446 | throw new IllegalStateException("Can't use this method if inference policy is not AFTER_HANDLER"); |
447 | } |
448 | |
449 | final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>(); // New queue for after event. |
450 | |
451 | return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() { |
452 | |
453 | @Override |
454 | public int getInferenceChainLength() { |
455 | return master.getInferenceChainLength(); |
456 | } |
457 | |
458 | @Override |
459 | public TrackingExecutorService getExecutorService() { |
460 | return master.getExecutorService(); |
461 | } |
462 | |
463 | @Override |
464 | public ExceptionHandler getRootHandle() { |
465 | return master.getRootHandle(); |
466 | } |
467 | |
468 | @Override |
469 | public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() { |
470 | return LocalEventBusBase.this; |
471 | } |
472 | |
473 | @Override |
474 | public void setRootHandle(ExceptionHandler rootHandle) { |
475 | throw new UnsupportedOperationException(); |
476 | } |
477 | |
478 | @Override |
479 | public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() { |
480 | return master.createNext(); |
481 | } |
482 | |
483 | @Override |
484 | public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() { |
485 | return LocalEventBusBase.this.wrapInferenceContext(this); |
486 | } |
487 | |
488 | @Override |
489 | public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) { |
490 | if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) { |
491 | LocalEventBusBase.this.processInferenceCommand(command); |
492 | } else { |
493 | inferenceCommandsQueue.add(command); |
494 | } |
495 | } |
496 | |
497 | @Override |
498 | public void processInferenceCommands() { |
499 | Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator(); |
500 | while (cit.hasNext()) { |
501 | LocalEventBusBase.this.processInferenceCommand(cit.next()); |
502 | cit.remove(); |
503 | } |
504 | } |
505 | |
506 | @Override |
507 | public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() { |
508 | return inferenceCommandsQueue; |
509 | } |
510 | |
511 | }; |
512 | } |
513 | |
514 | @Override |
515 | protected void processRemoveCommand(RemoveCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) { |
516 | ((LocalHandle<E,P,C,S>) command.getHandle()).remove(command.isForUpdate(), command.getInferenceContext()); |
517 | } |
518 | |
519 | // Iterator<InferenceCommand<E, P, C, K, H, S>> cit = inferenceContext.getInferenceCommandsQueue().iterator(); |
520 | // while (cit.hasNext()) { |
521 | // processInferenceCommand(cit.next()); |
522 | // cit.remove(); |
523 | // } |
524 | |
525 | // if (InferencePolicy.IMMEDIATELY.equals(inferenceContext.getBus().getInferencePolicy())) { |
526 | // ((AbstractEventBus<E,P,C,K,H,S>) inferenceContext.getBus()).processInferenceCommand(postCommand); |
527 | // } else { |
528 | |
529 | |
530 | |
531 | } |