001 package com.hammurapi.eventbus.local;
002
003 import java.util.Iterator;
004 import java.util.Queue;
005 import java.util.concurrent.ConcurrentLinkedQueue;
006 import java.util.concurrent.ExecutorService;
007 import java.util.concurrent.TimeUnit;
008 import java.util.concurrent.atomic.AtomicLong;
009 import java.util.concurrent.locks.Lock;
010 import java.util.concurrent.locks.ReentrantLock;
011 import java.util.logging.Logger;
012
013 import com.hammurapi.common.ExceptionHandler;
014 import com.hammurapi.common.ObservableConverter;
015 import com.hammurapi.common.concurrent.LocalTrackingExecutorService;
016 import com.hammurapi.common.concurrent.TrackingExecutorService;
017 import com.hammurapi.eventbus.AbstractEventBus;
018 import com.hammurapi.eventbus.CompositeInferenceFilter;
019 import com.hammurapi.eventbus.EventBus;
020 import com.hammurapi.eventbus.EventStore;
021 import com.hammurapi.eventbus.InferenceCommand;
022 import com.hammurapi.eventbus.InferenceContext;
023 import com.hammurapi.eventbus.InferenceFilter;
024 import com.hammurapi.eventbus.InferencePolicy;
025 import com.hammurapi.eventbus.Matcher;
026 import com.hammurapi.eventbus.PostCommand;
027 import com.hammurapi.eventbus.RemoveCommand;
028 import com.hammurapi.eventbus.monitoring.StatsCollector;
029 import com.hammurapi.store.local.LocalHandle.HandleStrength;
030
031 /**
032 * Base class for event bus which functions within JVM boundaries.
033 * @author Pavel Vlasov
034 *
035 * @param <E> Event type.
036 * @param <P> Handler priority type.
037 * @param <C> Context type.
038 * @param <K> Registration key type.
039 */
040
041 public class LocalEventBusBase<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>>
042 extends AbstractEventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> {
043
044
045 interface LocalHandle<E,P extends Comparable<P>,C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> extends AbstractEventBus.Handle<E, P, C, Long> {
046 void addRemoveListener(RemoveListener<E, P, C, S> removeListener);
047 void remove(boolean forUpdate, InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext);
048 }
049
050 private static final Logger logger = Logger.getLogger(LocalEventBusBase.class.getName());
051 private LocalTrackingExecutorService executorService;
052 HandleStrength collectorHandleStrength;
053 private int maxDerivationDepth = 100;
054 S unmodifiableStore;
055 ObservableConverter<E> observableConverter;
056
057 public int getMaxDerivationDepth() {
058 return maxDerivationDepth;
059 }
060
061 public void setMaxDerivationDepth(int maxDerivationDepth) {
062 this.maxDerivationDepth = maxDerivationDepth;
063 }
064
065 /**
066 * Bus configurator.
067 * @author Pavel Vlasov
068 *
069 */
070 public static class Config<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> {
071 private Class<E> eventType;
072 private S store;
073 private InferencePolicy inferencePolicy = InferencePolicy.IMMEDIATELY;
074 private ExecutorService executorService;
075 private HandleStrength collectorHandleStrength = HandleStrength.STRONG;
076 private ObservableConverter<E> observableConverter;
077 private boolean assertPredicatesBeforePost;
078 private StatsCollector statsCollector;
079 private TimeUnit statsTimeUnit;
080 private InferenceFilter<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>, S>[] inferenceFilters;
081 private LocalMatcher<E, P, C, S> matcher = new LocalPredicateChainingMatcher<E, P, C, S>();
082
083 public void setStatsCollector(StatsCollector statsCollector) {
084 this.statsCollector = statsCollector;
085 }
086
087 public StatsCollector getStatsCollector() {
088 return statsCollector;
089 }
090
091 public Class<E> getEventType() {
092 return eventType;
093 }
094 public void setEventType(Class<E> eventType) {
095 this.eventType = eventType;
096 }
097 public S getStore() {
098 return store;
099 }
100 public void setStore(S store) {
101 this.store = store;
102 }
103 public InferencePolicy getInferencePolicy() {
104 return inferencePolicy;
105 }
106 public void setInferencePolicy(InferencePolicy inferencePolicy) {
107 this.inferencePolicy = inferencePolicy;
108 }
109 public ExecutorService getExecutorService() {
110 return executorService;
111 }
112 public void setExecutorService(ExecutorService executorService) {
113 this.executorService = executorService;
114 }
115 public HandleStrength getCollectorHandleStrength() {
116 return collectorHandleStrength;
117 }
118 public void setCollectorHandleStrength(HandleStrength collectorHandleStrength) {
119 this.collectorHandleStrength = collectorHandleStrength;
120 }
121 public ObservableConverter<E> getObservableConverter() {
122 return observableConverter;
123 }
124 public void setObservableConverter(ObservableConverter<E> observableConverter) {
125 this.observableConverter = observableConverter;
126 }
127 public boolean isAssertPredicatesBeforePost() {
128 return assertPredicatesBeforePost;
129 }
130 public void setAssertPredicatesBeforePost(boolean assertPredicatesBeforePost) {
131 this.assertPredicatesBeforePost = assertPredicatesBeforePost;
132 }
133 public TimeUnit getStatsTimeUnit() {
134 return statsTimeUnit;
135 }
136 public void setStatsTimeUnit(TimeUnit statsTimeUnit) {
137 this.statsTimeUnit = statsTimeUnit;
138 }
139 public void setInferenceFilter(InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>... inferenceFilters) {
140 this.inferenceFilters = inferenceFilters;
141 }
142 public InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>[] getInferenceFilters() {
143 return inferenceFilters;
144 }
145
146 public Matcher<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> getMatcher() {
147 return matcher;
148 }
149
150 public void setMatcher(LocalMatcher<E, P, C, S> matcher) {
151 this.matcher = matcher;
152 }
153
154 InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> getInferenceFilter() {
155 if (inferenceFilters==null || inferenceFilters.length==0) {
156 return null;
157 }
158 if (inferenceFilters.length==1) {
159 return inferenceFilters[0];
160 }
161 return new CompositeInferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S>(inferenceFilters);
162 }
163
164 }
165
166 private boolean assertPredicatesBeforePost;
167
168 boolean isAssertPredicatesBeforePost() {
169 return assertPredicatesBeforePost;
170 }
171
172 HandleStrength getCollectorHandleStrength() {
173 return collectorHandleStrength;
174 }
175
176 public LocalEventBusBase(Config<E,P,C,S> config) {
177
178 super(
179 config.getEventType(),
180 config.getStore(),
181 config.getInferencePolicy(),
182 config.getInferenceFilter(),
183 config.getStatsCollector(),
184 config.getStatsTimeUnit(),
185 config.getMatcher());
186
187 unmodifiableStore = config.getStore().createUnmodifiableFacade();
188 this.executorService = config.getExecutorService()==null ? null : new LocalTrackingExecutorService(config.getExecutorService(), false, "Bus executor");
189 this.collectorHandleStrength = config.getCollectorHandleStrength();
190
191 this.observableConverter = config.getObservableConverter();
192 this.assertPredicatesBeforePost = config.isAssertPredicatesBeforePost();
193 }
194
195 @Override
196 protected TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name) {
197 return master==null ? null : new LocalTrackingExecutorService(master, oneOff, name);
198 }
199
200 @Override
201 protected TrackingExecutorService createExecutorService( boolean oneOff, String name) {
202 TrackingExecutorService es = getExecutorService();
203 return es==null ? null : new LocalTrackingExecutorService(es, oneOff, name);
204 }
205
206 @Override
207 protected TrackingExecutorService getExecutorService() {
208 return executorService;
209 }
210
211 private AtomicLong eventCounter = new AtomicLong();
212
213 @Override
214 protected Long nextId() {
215 return eventCounter.incrementAndGet();
216 }
217
218 @Override
219 protected Handle<E, P, C, Long> newMasterHandle(PostCommand<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> postCommand) {
220
221 MasterHandle<E,P,C, S> masterHandle = new MasterHandle<E,P,C, S>(this, postCommand.getEvent(), postCommand.getInferenceContext().getExecutorService(), nextId(), postCommand.isDirectPost(), postCommand.getValidators());
222 if (!postCommand.isDirectPost()) {
223 masterHandle.addDerivation(postCommand.getHandlerId(), postCommand.getHandler(), postCommand.getInputs());
224 }
225 return masterHandle;
226 }
227
228 // /**
229 // * To make it visible to local classes.
230 // */
231 // @Override
232 // protected void postInternal(
233 // Handle<E, P, C, Long> masterHandle,
234 // ExecutorService masterExecutorService,
235 // Queue<InferenceCommand<E, P, C, Handle<E, P, C, Long>>> conclusionQueue,
236 // ExceptionHandler rootHandle) {
237 // super.postInternal(masterHandle, masterExecutorService, conclusionQueue, rootHandle);
238 // }
239
240 // /**
241 // * createMasterHandle() exposed to local dispatch context.
242 // */
243 // protected Handle<E,P,C,Long> createMasterHandleExposed(E event, TrackingExecutorService joinDelegate, boolean directPost, Predicate<E,S>[] validators, Derivation<E,P,C> derivation) {
244 // CreateMasterHandleResult cmhr = createMasterHandle(event, joinDelegate, directPost, validators, derivation);
245 // return cmhr.isNew() ? cmhr.getHandle() : null;
246 // }
247
248 @Override
249 public void remove(E event) {
250 if (getStore().getPrimaryKeyExtractor()==null) {
251 getStore().writeLock().lock();
252 try {
253 for (Handle<E,P,C,Long> handle: getStore()) {
254 if (event.equals(handle.getEvent())) {
255 handle.remove();
256 }
257 }
258 } finally {
259 getStore().writeLock().unlock();
260 }
261 } else {
262 Handle<E, P, C, Long> h = getStore().getByPrimaryKey(event);
263 if (h!=null) {
264 h.remove();
265 }
266 }
267 }
268
269 private Lock rtcLock = new ReentrantLock();
270
271 @Override
272 protected Lock getRtcLock() {
273 return rtcLock;
274 }
275
276 /**
277 * To make it visible to local classes.
278 */
279 @Override
280 protected com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long> processInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
281 return super.processInferenceCommand(command);
282 }
283
284 @Override
285 protected InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createInferenceContext() {
286
287 boolean createQueue = InferencePolicy.AFTER_EVENT.compareTo(getInferencePolicy())<=0;
288 final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue = createQueue ? new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>() : null;
289 final TrackingExecutorService hes = createExecutorService(false, "Handle executor");
290
291 return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() {
292
293 private ExceptionHandler rootHandle;
294
295 @Override
296 public int getInferenceChainLength() {
297 return 0;
298 }
299
300 @Override
301 public TrackingExecutorService getExecutorService() {
302 return hes;
303 }
304
305 @Override
306 public ExceptionHandler getRootHandle() {
307 return rootHandle;
308 }
309
310 @Override
311 public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
312 return LocalEventBusBase.this;
313 }
314
315 @Override
316 public void setRootHandle(ExceptionHandler rootHandle) {
317 if (this.rootHandle!=null && this.rootHandle!=rootHandle) {
318 throw new IllegalStateException("Root handle already set");
319 }
320 this.rootHandle = rootHandle;
321 }
322
323 @Override
324 public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() {
325 return LocalEventBusBase.this.createInferenceContext(this);
326 }
327
328
329 @Override
330 public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() {
331 return LocalEventBusBase.this.wrapInferenceContext(this);
332 }
333
334 @Override
335 public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
336 if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
337 LocalEventBusBase.this.processInferenceCommand(command);
338 } else {
339 inferenceCommandsQueue.add(command);
340 }
341 }
342
343 @Override
344 public void processInferenceCommands() {
345 Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
346 while (cit.hasNext()) {
347 LocalEventBusBase.this.processInferenceCommand(cit.next());
348 cit.remove();
349 }
350 }
351
352 @Override
353 public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
354 return inferenceCommandsQueue;
355 }
356
357 };
358 }
359
360 InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createInferenceContext(final InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> prev) {
361 final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue;
362 switch (getInferencePolicy()) {
363 case EXCLUSIVE:
364 case AFTER_ROOT_EVENT:
365 inferenceCommandsQueue = prev.getInferenceCommandsQueue(); // Inherited.
366 break;
367 case AFTER_EVENT:
368 inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>(); // New queue for after event.
369 break;
370 case AFTER_HANDLER:
371 case IMMEDIATELY:
372 inferenceCommandsQueue = null; // No queue for
373 break;
374 default:
375 throw new IllegalArgumentException("Unexpected inference policy: "+getInferencePolicy());
376 }
377
378 final TrackingExecutorService hes = createExecutorService(prev.getExecutorService(), false, "Handle executor");
379
380 return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() {
381
382 @Override
383 public int getInferenceChainLength() {
384 return prev.getInferenceChainLength()+1;
385 }
386
387 @Override
388 public TrackingExecutorService getExecutorService() {
389 return hes;
390 }
391
392 @Override
393 public ExceptionHandler getRootHandle() {
394 return prev.getRootHandle();
395 }
396
397 @Override
398 public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
399 return LocalEventBusBase.this;
400 }
401
402 @Override
403 public void setRootHandle(ExceptionHandler rootHandle) {
404 prev.setRootHandle(rootHandle);
405 }
406
407 @Override
408 public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() {
409 return LocalEventBusBase.this.createInferenceContext(this);
410 }
411
412 @Override
413 public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() {
414 return LocalEventBusBase.this.wrapInferenceContext(this);
415 }
416
417
418 @Override
419 public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
420 if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
421 LocalEventBusBase.this.processInferenceCommand(command);
422 } else {
423 inferenceCommandsQueue.add(command);
424 }
425 }
426
427 @Override
428 public void processInferenceCommands() {
429 Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
430 while (cit.hasNext()) {
431 LocalEventBusBase.this.processInferenceCommand(cit.next());
432 cit.remove();
433 }
434 }
435
436 @Override
437 public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
438 return inferenceCommandsQueue;
439 }
440
441 };
442 }
443
444 InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> wrapInferenceContext(final InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> master) {
445 if (!InferencePolicy.AFTER_HANDLER.equals(getInferencePolicy())) {
446 throw new IllegalStateException("Can't use this method if inference policy is not AFTER_HANDLER");
447 }
448
449 final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>(); // New queue for after event.
450
451 return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() {
452
453 @Override
454 public int getInferenceChainLength() {
455 return master.getInferenceChainLength();
456 }
457
458 @Override
459 public TrackingExecutorService getExecutorService() {
460 return master.getExecutorService();
461 }
462
463 @Override
464 public ExceptionHandler getRootHandle() {
465 return master.getRootHandle();
466 }
467
468 @Override
469 public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
470 return LocalEventBusBase.this;
471 }
472
473 @Override
474 public void setRootHandle(ExceptionHandler rootHandle) {
475 throw new UnsupportedOperationException();
476 }
477
478 @Override
479 public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() {
480 return master.createNext();
481 }
482
483 @Override
484 public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() {
485 return LocalEventBusBase.this.wrapInferenceContext(this);
486 }
487
488 @Override
489 public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
490 if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
491 LocalEventBusBase.this.processInferenceCommand(command);
492 } else {
493 inferenceCommandsQueue.add(command);
494 }
495 }
496
497 @Override
498 public void processInferenceCommands() {
499 Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
500 while (cit.hasNext()) {
501 LocalEventBusBase.this.processInferenceCommand(cit.next());
502 cit.remove();
503 }
504 }
505
506 @Override
507 public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
508 return inferenceCommandsQueue;
509 }
510
511 };
512 }
513
514 @Override
515 protected void processRemoveCommand(RemoveCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
516 ((LocalHandle<E,P,C,S>) command.getHandle()).remove(command.isForUpdate(), command.getInferenceContext());
517 }
518
519 // Iterator<InferenceCommand<E, P, C, K, H, S>> cit = inferenceContext.getInferenceCommandsQueue().iterator();
520 // while (cit.hasNext()) {
521 // processInferenceCommand(cit.next());
522 // cit.remove();
523 // }
524
525 // if (InferencePolicy.IMMEDIATELY.equals(inferenceContext.getBus().getInferencePolicy())) {
526 // ((AbstractEventBus<E,P,C,K,H,S>) inferenceContext.getBus()).processInferenceCommand(postCommand);
527 // } else {
528
529
530
531 }