001 package com.hammurapi.eventbus.local;
002
003 import java.lang.reflect.Array;
004 import java.util.ArrayList;
005 import java.util.Arrays;
006 import java.util.Collection;
007 import java.util.Collections;
008 import java.util.Comparator;
009 import java.util.Iterator;
010 import java.util.List;
011 import java.util.Set;
012 import java.util.concurrent.ExecutorService;
013 import java.util.concurrent.atomic.AtomicBoolean;
014 import java.util.concurrent.atomic.AtomicLong;
015 import java.util.concurrent.locks.Lock;
016 import java.util.concurrent.locks.ReadWriteLock;
017 import java.util.concurrent.locks.ReentrantLock;
018 import java.util.concurrent.locks.ReentrantReadWriteLock;
019 import java.util.logging.Level;
020 import java.util.logging.Logger;
021
022 import com.hammurapi.common.Joiner;
023 import com.hammurapi.common.Joiner.Collector;
024 import com.hammurapi.eventbus.AbstractEventBus;
025 import com.hammurapi.eventbus.AbstractEventBus.Handle;
026 import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
027 import com.hammurapi.eventbus.CompositeContext;
028 import com.hammurapi.eventbus.DispatchNetworkException;
029 import com.hammurapi.eventbus.EventBus;
030 import com.hammurapi.eventbus.EventDispatchContext;
031 import com.hammurapi.eventbus.EventDispatchException;
032 import com.hammurapi.eventbus.EventDispatchJoinContext;
033 import com.hammurapi.eventbus.EventHandler;
034 import com.hammurapi.eventbus.EventHandlerBase.Mode;
035 import com.hammurapi.eventbus.EventHandlerWrapper;
036 import com.hammurapi.eventbus.EventStore;
037 import com.hammurapi.eventbus.InferenceCommand;
038 import com.hammurapi.eventbus.InferenceContext;
039 import com.hammurapi.eventbus.InferencePolicy;
040 import com.hammurapi.eventbus.PredicateChainingMatcher;
041 import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle;
042 import com.hammurapi.extract.Predicate;
043 import com.hammurapi.extract.True;
044
045 /**
046 * This matcher does matching in a straightforward way - it evaluates handler predicates sequentially, it does not cache extracted values, does not
047 * optimize event joins, and does not use executor service. The purpose of this matcher is to help with debugging and troubleshooting of handler/predicate
048 * logic and to be a baseline of matching performance.
049 *
050 * This matcher doesn't support snapshots.
051 *
052 * @author Pavel Vlasov
053 *
054 * @param <E>
055 * @param <P>
056 * @param <C>
057 * @param <K>
058 * @param <H>
059 * @param <S>
060 */
061 public class LocalSimpleMatcher<E, P extends Comparable<P>, C, S extends EventStore<E,P,C,AbstractEventBus.Handle<E, P, C, Long>,S>> implements LocalMatcher<E,P,C,S> {
062 private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName());
063 private EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> eventBus;
064
065 protected Collection<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>> wrappers = new ArrayList<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>>();
066 private ReadWriteLock lock = new ReentrantReadWriteLock();
067
068 /**
069 * Lock for the inference network.
070 * @return
071 */
072 protected ReadWriteLock getLock() {
073 return lock;
074 }
075
076 private AtomicLong handlerCounter = new AtomicLong(-1);
077
078 protected Long nextId() {
079 LocalEventBusBase<E, P, C, S> localEventBusBase = (LocalEventBusBase<E,P,C,S>) eventBus;
080 return localEventBusBase==null ? handlerCounter.decrementAndGet() : localEventBusBase.nextId();
081 }
082
083 @Override
084 public Long addHandler(final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler) {
085 if (eventHandler.getCardinality()<1) {
086 throw new DispatchNetworkException("Handler cardinality cannot be less than one.");
087 }
088 getLock().writeLock().lock();
089 try {
090 final Long registrationKey = nextId();
091 final Set<Long> keySet = Collections.unmodifiableSet(Collections.singleton(registrationKey));
092 if (eventHandler.getCardinality()==1) {
093 EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w = new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() {
094
095 @Override
096 public boolean consumes() {
097 return eventHandler.consumes();
098 }
099
100 @Override
101 public P getPriority() {
102 return eventHandler.getPriority();
103 }
104
105 @Override
106 public int getCardinality() {
107 return eventHandler.getCardinality();
108 }
109
110 @Override
111 public void reset() {
112 eventHandler.reset();
113 }
114
115 @Override
116 public C getContext() {
117 return eventHandler.getContext();
118 }
119
120 @Override
121 public boolean isOneOff() {
122 return eventHandler.isOneOff();
123 }
124
125 @Override
126 public Mode getMode() {
127 return eventHandler.getMode();
128 }
129
130 private AtomicBoolean fired=new AtomicBoolean(false);
131
132 @SuppressWarnings("unchecked")
133 @Override
134 public void post(
135 EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context,
136 InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext,
137 Handle<E,P,C,Long>... handles) {
138 if (!isOneOff() || !fired.getAndSet(true)) {
139 E[] events = (E[]) Array.newInstance(eventBus.getEventType(), handles.length);
140 for (int i=0; i<handles.length; ++i) {
141 if (handles[i].isValid()) {
142 events[i] = handles[i].getEvent();
143 } else {
144 return;
145 }
146 }
147
148 if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
149
150 // if (inferenceContext.getInferenceCommandsQueue()!=null) {
151 // throw new IllegalArgumentException("Inference commands queue shall be null");
152 // }
153
154 inferenceContext = inferenceContext.wrap();
155 }
156
157 LocalEventDispatchContextImpl<E,P,C,S> dispatchContext;
158
159 if (eventHandler.getMode()==Mode.POST || eventHandler.getMode()==Mode.BOTH) {
160 if (handles.length==1) {
161 dispatchContext = new LocalEventDispatchContextImpl<E, P, C, S>(
162 inferenceContext,
163 eventHandler,
164 registrationKey,
165 handles,
166 events,
167 Mode.POST);
168 } else {
169 dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
170 inferenceContext,
171 eventHandler,
172 registrationKey,
173 handles,
174 events,
175 (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context,
176 Mode.POST);
177 }
178
179 LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
180 try {
181 if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) {
182 if (!getPredicate().extract(getContext(), null, events)) {
183 throw new EventDispatchException("Handler predicate evaluated to false before post: "+getPredicate()+", handler "+eventHandler);
184 }
185 }
186 eventHandler.post(dispatchContext, events);
187
188 // If no exception - post events
189 if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
190 inferenceContext.processInferenceCommands();
191 }
192 } catch (Exception e) {
193 logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
194 if (eventBus.getExceptionHandler()!=null) {
195 eventBus.getExceptionHandler().handleException(e);
196 }
197 if (inferenceContext.getRootHandle()!=null) {
198 inferenceContext.getRootHandle().handleException(e);
199 }
200 } finally {
201 LocalEventDispatchContextImpl.threadContext.set(null);
202 }
203 if (isOneOff()) {
204 ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey);
205 }
206 }
207
208 if (eventHandler.getMode()==Mode.REMOVE || eventHandler.getMode()==Mode.BOTH) {
209 RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(eventHandler, registrationKey, handles, eventBus.getEventType());
210 for (Handle<E,P,C,Long> handle: handles) {
211 ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
212 }
213 }
214 }
215 }
216
217 @Override
218 public Set<Long> getRegistrationKeys() {
219 return keySet;
220 }
221
222 @Override
223 public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() {
224 return eventHandler;
225 }
226
227 @Override
228 public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) {
229 // NOP
230 }
231
232 @Override
233 public Predicate<E, C> getPredicate() {
234 return eventHandler.getPredicate();
235 }
236
237 };
238 wrappers.add(w);
239 } else {
240 HandleJoiner joiner = createJoiner(eventHandler, registrationKey);
241 for (int i=0; i<eventHandler.getCardinality(); ++i) {
242 wrappers.add(createJoinHandler(eventHandler, registrationKey, keySet, joiner, i));
243 }
244 }
245 return registrationKey;
246 } finally {
247 getLock().writeLock().unlock();
248 }
249 }
250
251 private EventHandlerWrapper<E, P, C, Long, Handle<E, P, C, Long>, S> createJoinHandler(
252 final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler,
253 final Long registrationKey,
254 final Set<Long> keySet,
255 final HandleJoiner joiner,
256 final int idx) {
257
258 return new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() {
259
260 @Override
261 public boolean consumes() {
262 return eventHandler.consumes();
263 }
264
265 @Override
266 public P getPriority() {
267 return eventHandler.getPriority();
268 }
269
270 @Override
271 public int getCardinality() {
272 return eventHandler.getCardinality();
273 }
274
275 @Override
276 public void reset() {
277 eventHandler.reset();
278 }
279
280 @Override
281 public C getContext() {
282 return eventHandler.getContext();
283 }
284
285 @Override
286 public boolean isOneOff() {
287 return eventHandler.isOneOff();
288 }
289
290 @Override
291 public Mode getMode() {
292 return eventHandler.getMode();
293 }
294
295 @Override
296 public void post(
297 EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context,
298 InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext,
299 Handle<E,P,C,Long>... handles) {
300
301 if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
302
303 // if (inferenceContext.getInferenceCommandsQueue()!=null) {
304 // throw new IllegalArgumentException("Inference commands queue shall be null");
305 // }
306
307 inferenceContext = inferenceContext.wrap();
308 }
309
310 CompositeContext<E, P, C, Long, Handle<E, P, C, Long>, S> compositeContext = new CompositeContext<E, P, C, Long,AbstractEventBus.Handle<E,P,C,Long>, S>(
311 context,
312 eventHandler.getContext(),
313 inferenceContext);
314
315 try {
316 joiner.addInput(idx, handles[0], compositeContext);
317 } catch (Exception e) {
318 logger.log(Level.SEVERE, "Join problem: "+e, e);
319 if (eventBus.getExceptionHandler()!=null) {
320 eventBus.getExceptionHandler().handleException(e);
321 }
322 if (inferenceContext.getRootHandle()!=null) {
323 inferenceContext.getRootHandle().handleException(e);
324 }
325 } finally {
326 if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
327 inferenceContext.processInferenceCommands();
328 }
329 }
330
331 }
332
333 @Override
334 public Set<Long> getRegistrationKeys() {
335 return keySet;
336 }
337
338 @Override
339 public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() {
340 return eventHandler;
341 }
342
343 @Override
344 public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) {
345 // NOP
346 }
347
348 @Override
349 public Predicate<E, C> getPredicate() {
350 /**
351 * Always returns true because predicate is evaluated in join().
352 */
353 return True.getInstance();
354 }
355
356 };
357 }
358
359 @Override
360 public Iterable<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> match(E event, ExecutorService executorService) {
361 @SuppressWarnings("unchecked")
362 E[] events = (E[]) Array.newInstance(eventBus.getEventType(), 1);
363 events[0] = event;
364
365 List<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> ret = new ArrayList<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>();
366 getLock().readLock().lock();
367 try {
368 for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) {
369 if (w.getPredicate().extract(w.getContext(), null, events)) {
370 ret.add(w);
371 }
372 }
373 } finally {
374 getLock().readLock().unlock();
375 }
376
377 Collections.sort(ret, new Comparator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>() {
378
379 @Override
380 public int compare(EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o1, EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o2) {
381 P p1 = o1.getPriority();
382 P p2 = o2.getPriority();
383 if (p2==null) {
384 return p1==null ? o2.hashCode() - o1.hashCode() : 1;
385 }
386 if (p1==null) {
387 return -1;
388 }
389 return p2.compareTo(p1);
390 }
391
392 });
393
394 return ret;
395 }
396
397 @Override
398 public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot) {
399 // TODO - Implement.
400 throw new UnsupportedOperationException("LocalSimpleMatcher does not support takeSnapshot() operation");
401 }
402
403 @Override
404 public void removeHandlers(Iterable<Long> keys) {
405 getLock().writeLock().lock();
406 try {
407 for (Long key: keys) {
408 Iterator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> it = wrappers.iterator();
409 Set<Long> rKeys = it.next().getRegistrationKeys();
410 for (Long rKey: rKeys) {
411 if (rKey.equals(key)) {
412 it.remove();
413 break;
414 }
415 }
416 }
417 } finally {
418 getLock().writeLock().unlock();
419 }
420 }
421
422 @Override
423 public void reset() {
424 getLock().readLock().lock();
425 try {
426 for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) {
427 w.reset();
428 }
429 } finally {
430 getLock().readLock().unlock();
431 }
432 }
433
434 @Override
435 public void manageHandlers(HandlerManager<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handlerManager) {
436 getLock().writeLock().lock();
437 try {
438 handlerManager.manageHandlers(this);
439 } finally {
440 getLock().writeLock().unlock();
441 }
442 }
443
444 @Override
445 public void setEventBus(EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> bus) {
446 this.eventBus = bus;
447 }
448
449 private class HandleJoiner extends Joiner<Handle<E,P,C,Long>, CompositeContext<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>, Object> {
450
451 Lock lock = new ReentrantLock();
452
453 private EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler;
454 private Collector<Handle<E,P,C,Long>>[] ic;
455
456 private Long registrationKey;
457
458 public HandleJoiner(
459 Collector<Handle<E,P,C,Long>>[] inputCollectors,
460 Class<Handle<E,P,C,Long>> inputType,
461 EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler,
462 Long registrationKey) {
463
464 super(inputCollectors, inputType, false);
465 this.ic = inputCollectors;
466 this.handler = handler;
467 this.registrationKey = registrationKey;
468 }
469
470 @Override
471 protected void startJoin() {
472 // System.out.println("Lock by "+Thread.currentThread());
473 lock.lock();
474 }
475
476 @Override
477 protected void endJoin() {
478 // System.out.println("UnLock by "+Thread.currentThread());
479 lock.unlock();
480 }
481
482 private AtomicBoolean fired=new AtomicBoolean(false);
483
484 @Override
485 protected Object join(
486 final Handle<E,P,C,Long>[] inputs,
487 final CompositeContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> context,
488 final Joiner.InputConsumer consumer,
489 int activator) throws Exception {
490
491 if (!handler.isOneOff() || !fired.getAndSet(true)) {
492 final E[] events = (E[]) Array.newInstance(eventBus.getEventType(), inputs.length);
493 for (int i=0; i<inputs.length; ++i) {
494 if (inputs[i].isValid()) {
495 events[i] = inputs[i].getEvent();
496 } else {
497 return null;
498 }
499 }
500
501 if (!handler.getPredicate().extract(handler.getContext(), null, events)) {
502 return null;
503 }
504
505 if (handler.getMode()==Mode.POST || handler.getMode()==Mode.BOTH) {
506 LocalEventDispatchContextImpl<E,P,C,S> dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
507 context.getInferenceContext(),
508 handler,
509 registrationKey,
510 inputs,
511 events,
512 (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context.getEventDispatchContext(),
513 Mode.POST) {
514
515 @Override
516 public void consumeJoin(int index) {
517 consumer.consume(index);
518 }
519
520 @Override
521 public void consumeJoin(E event) {
522 for (int i=0; i<events.length; ++i) {
523 if (event==events[i]) {
524 consumeJoin(i);
525 break;
526 }
527 }
528 }
529 };
530
531 LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
532 try {
533 if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) {
534 if (!handler.getPredicate().extract(handler.getContext(), null, events)) {
535 throw new EventDispatchException("Handler predicate evaluated to false before post: "+handler.getPredicate()+", handler "+handler);
536 }
537 }
538 handler.post(dispatchContext, events);
539 } catch (Exception e) {
540 logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
541 if (eventBus.getExceptionHandler()!=null) {
542 eventBus.getExceptionHandler().handleException(e);
543 }
544 if (context.getInferenceContext().getRootHandle()!=null) {
545 context.getInferenceContext().getRootHandle().handleException(e);
546 }
547 } finally {
548 LocalEventDispatchContextImpl.threadContext.set(null);
549 }
550 if (handler.isOneOff()) {
551 ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey);
552
553 }
554 }
555
556 if (handler.getMode()==Mode.REMOVE || handler.getMode()==Mode.BOTH) {
557 RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(handler, registrationKey, inputs, eventBus.getEventType());
558 for (Handle<E,P,C,Long> handle: inputs) {
559 ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
560 }
561 }
562
563 }
564 return null;
565 }
566
567 @Override
568 public String toString() {
569 return "Simple HandleJoiner [handler=" + handler + ", ic=" + Arrays.toString(ic) + "]";
570 };
571
572 }
573
574 @SuppressWarnings("unchecked")
575 protected HandleJoiner createJoiner(EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler, Long registrationKey) {
576
577 Collector<Handle<E,P,C,Long>>[] inputCollectors = new Collector[eventHandler.getCardinality()];
578 for (int i=0; i<inputCollectors.length; ++i) {
579 inputCollectors[i] = new Joiner.CollectionAdapter<Handle<E,P,C,Long>>(new ArrayList<Handle<E,P,C,Long>>() /* createCollection()*/) {
580
581 /**
582 * Override to replace handles with facades of appropriate type.
583 */
584 public boolean add(AbstractEventBus.Handle<E,P,C,Long> handle) {
585 if (handle instanceof MasterHandle) {
586 return super.add(new FacadeHandle((MasterHandle) handle, ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength()));
587 } else if (handle instanceof FacadeHandle) {
588 return super.add(new FacadeHandle(((FacadeHandle) handle).getMaster(), ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength()));
589 } else {
590 throw new IllegalArgumentException("Unexpected handle type: "+handle.getClass());
591 }
592 };
593
594 public boolean remove(AbstractEventBus.Handle<E,P,C,Long> obj) {
595 // System.out.println("Before remove: "+this);
596 Iterator<Handle<E, P, C, Long>> it = iterator();
597 while (it.hasNext()) {
598 if (obj.getId().equals(it.next().getId())) {
599 it.remove();
600 return true;
601 }
602 }
603
604 return false;
605 };
606 };
607 }
608 Class inputClass = Handle.class;
609 return new HandleJoiner(inputCollectors, inputClass, eventHandler, registrationKey);
610 }
611
612 // private Collection<Handle<E, P, C, Long>> createCollection() {
613 // return InvocationRecordingProxyFactory.wrap(Collection.class, new ArrayList<Handle<E,P,C,Long>>(), new ProblemListener() {
614 //
615 // @Override
616 // public void onProblem(Throwable th, List<InvocationRecord> records) {
617 // Object[] ra = records.toArray();
618 // for (int i=0; i<ra.length; ++i) {
619 // System.out.println(i+" "+ra[i]);
620 // }
621 //
622 // th.printStackTrace();
623 // }
624 //
625 // });
626 // }
627
628
629 }