001 package com.hammurapi.eventbus.local;
002
003 import java.lang.reflect.Array;
004 import java.util.ArrayList;
005 import java.util.Arrays;
006 import java.util.Collections;
007 import java.util.HashSet;
008 import java.util.Iterator;
009 import java.util.List;
010 import java.util.Set;
011 import java.util.concurrent.ExecutorService;
012 import java.util.concurrent.atomic.AtomicBoolean;
013 import java.util.concurrent.atomic.AtomicLong;
014 import java.util.concurrent.locks.Lock;
015 import java.util.concurrent.locks.ReadWriteLock;
016 import java.util.concurrent.locks.ReentrantLock;
017 import java.util.concurrent.locks.ReentrantReadWriteLock;
018 import java.util.logging.Level;
019 import java.util.logging.Logger;
020
021 import com.hammurapi.common.Joiner;
022 import com.hammurapi.common.Joiner.Collector;
023 import com.hammurapi.common.concurrent.TrackingExecutorService;
024 import com.hammurapi.eventbus.AbstractEventBus;
025 import com.hammurapi.eventbus.AbstractEventBus.Handle;
026 import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
027 import com.hammurapi.eventbus.AbstractEventBus.StateSnapshot;
028 import com.hammurapi.eventbus.CompositeContext;
029 import com.hammurapi.eventbus.EventDispatchContext;
030 import com.hammurapi.eventbus.EventDispatchException;
031 import com.hammurapi.eventbus.EventDispatchJoinContext;
032 import com.hammurapi.eventbus.EventDispatchJoinContextFilter;
033 import com.hammurapi.eventbus.EventHandler;
034 import com.hammurapi.eventbus.EventHandlerWrapper;
035 import com.hammurapi.eventbus.EventHandlerWrapperFilter;
036 import com.hammurapi.eventbus.EventStore;
037 import com.hammurapi.eventbus.InferenceCommand;
038 import com.hammurapi.eventbus.InferenceContext;
039 import com.hammurapi.eventbus.InferencePolicy;
040 import com.hammurapi.eventbus.JoinEventHandler;
041 import com.hammurapi.eventbus.PredicateChainingMatcher;
042 import com.hammurapi.eventbus.PredicatedInferenceNode;
043 import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle;
044 import com.hammurapi.extract.CompositePredicate;
045 import com.hammurapi.extract.Predicate;
046 import com.hammurapi.extract.True;
047
048 public class LocalPredicateChainingMatcher<E, P extends Comparable<P>, C, S extends EventStore<E,P,C,AbstractEventBus.Handle<E, P, C, Long>,S>> extends PredicateChainingMatcher<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> implements LocalMatcher<E,P,C,S> {
049
050 private static final Logger logger = Logger.getLogger(LocalPredicateChainingMatcher.class.getName());
051
052 private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
053
054 public LocalPredicateChainingMatcher() {
055 rootNode = createPredicatedInferenceNode(null, (Predicate<E, C>) True.getInstance(), null);
056 rootNode.setRoot(true);
057 }
058
059 @Override
060 protected PredicatedInferenceNode<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> createPredicatedInferenceNode(PredicatedInferenceNode<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> parent, Predicate<E, C> predicate, C context) {
061 return new LocalPredicatedInferenceNode<E, P, C, S>((LocalPredicatedInferenceNode<E, P, C, S>) parent, this, predicate, context, nextId());
062 }
063
064 @Override
065 protected ReadWriteLock getLock() {
066 return lock;
067 }
068
069 @Override
070 protected Set<Long> extractHandlerIds(EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> pHandler) {
071 EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> handler = EventHandlerWrapperFilter.peel(pHandler);
072 if (handler instanceof PredicateChainingMatcher.JoinInputEventHandler) {
073 JoinInputEventHandler jieh = (JoinInputEventHandler) handler;
074 return Collections.singleton((long) jieh.getId());
075 }
076 return handler.getRegistrationKeys();
077 }
078
079 @Override
080 protected EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> wrap(final EventHandler<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S> eventHandler, final Long registrationKey) {
081
082 // Construct cAnd
083 final Predicate<E, C> normalized;
084 Predicate<E, C> predicate = eventHandler.getPredicate();
085 if (predicate == null) {
086 normalized = True.getInstance();
087 } else if (predicate instanceof CompositePredicate) {
088 normalized = ((CompositePredicate) predicate).normalize();
089 } else {
090 normalized = predicate;
091 }
092
093 return new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S>() {
094
095 private Set<Long> regKeys = new HashSet<Long>();
096
097 {
098 regKeys.add(registrationKey);
099 }
100
101 @Override
102 public EventHandler<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S> getHandler() {
103 return eventHandler;
104 }
105
106 @Override
107 public Set<Long> getRegistrationKeys() {
108 return regKeys;
109 }
110
111 @Override
112 public boolean consumes() {
113 return eventHandler.consumes();
114 }
115
116 @Override
117 public int getCardinality() {
118 return eventHandler.getCardinality();
119 }
120
121 @Override
122 public C getContext() {
123 return eventHandler.getContext();
124 }
125
126 @Override
127 public P getPriority() {
128 return eventHandler.getPriority();
129 }
130
131 private AtomicBoolean fired=new AtomicBoolean(false);
132
133
134 @SuppressWarnings("unchecked")
135 @Override
136 public void post(
137 EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context,
138 InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>, S> inferenceContext,
139 Handle<E,P,C,Long>... handles) {
140 if (!isOneOff() || !fired.getAndSet(true)) {
141 E[] events = (E[]) Array.newInstance(getEventBus().getEventType(), handles.length);
142 for (int i=0; i<handles.length; ++i) {
143 if (handles[i].isValid()) {
144 events[i] = handles[i].getEvent();
145 } else {
146 return;
147 }
148 }
149
150 if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
151
152 // if (inferenceContext.getInferenceCommandsQueue()!=null) {
153 // throw new IllegalArgumentException("Inference commands queue shall be null");
154 // }
155
156 inferenceContext = inferenceContext.wrap();
157 }
158
159 LocalEventDispatchContextImpl<E,P,C,S> dispatchContext;
160
161 if (eventHandler.getMode()==Mode.POST || eventHandler.getMode()==Mode.BOTH) {
162 if (handles.length==1) {
163 dispatchContext = new LocalEventDispatchContextImpl<E, P, C, S>(
164 inferenceContext,
165 eventHandler,
166 registrationKey,
167 handles,
168 events,
169 Mode.POST);
170 } else {
171 dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
172 inferenceContext,
173 eventHandler,
174 registrationKey,
175 handles,
176 events,
177 (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context,
178 Mode.POST);
179 }
180
181 LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
182
183 try {
184 if (((LocalEventBusBase<E,P,C,S>) getEventBus()).isAssertPredicatesBeforePost()) {
185 if (!getPredicate().extract(getContext(), null, events)) {
186 throw new EventDispatchException("Handler predicate evaluated to false before post: "+getPredicate()+", handler "+eventHandler);
187 }
188 }
189 eventHandler.post(dispatchContext, events);
190
191 // If no exception - post events
192 if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
193 inferenceContext.processInferenceCommands();
194 }
195 } catch (Exception e) {
196 logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
197 if (getEventBus().getExceptionHandler()!=null) {
198 getEventBus().getExceptionHandler().handleException(e);
199 }
200 if (inferenceContext.getRootHandle()!=null) {
201 inferenceContext.getRootHandle().handleException(e);
202 }
203 } finally {
204 LocalEventDispatchContextImpl.threadContext.set(null);
205 }
206 if (isOneOff()) {
207 ((LocalEventBusBase<E,P,C,S>) getEventBus()).removeHandlers(registrationKey);
208 }
209 }
210
211 if (eventHandler.getMode()==Mode.REMOVE || eventHandler.getMode()==Mode.BOTH) {
212 RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(eventHandler, registrationKey, handles, getEventBus().getEventType());
213 for (Handle<E,P,C,Long> handle: handles) {
214 ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
215 }
216 }
217 }
218 }
219
220 @Override
221 public void reset() {
222 eventHandler.reset();
223 }
224
225 @Override
226 public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> snapshot, Set<Long> taken) {
227 if (taken.add(registrationKey)) {
228 snapshot.handler(registrationKey, eventHandler);
229 }
230 }
231
232 @Override
233 public String toString() {
234 return "Event handler wrapper(regKey = "+registrationKey+", handler = "+eventHandler+", predicate= "+getPredicate()+")";
235 }
236
237 @Override
238 public Predicate<E, C> getPredicate() {
239 return normalized;
240 }
241
242 @Override
243 public boolean isOneOff() {
244 return eventHandler.isOneOff();
245 }
246
247 @Override
248 public Mode getMode() {
249 return eventHandler.getMode();
250 }
251 };
252 }
253
254 private class HandleJoiner extends EventBusJoiner {
255
256 Lock lock = new ReentrantLock();
257
258 private JoinEventHandler<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handler;
259 private Collector<Handle<E,P,C,Long>[]>[] ic;
260 private int[][] indices;
261
262 public HandleJoiner(
263 Collector<Handle<E,P,C,Long>[]>[] inputCollectors,
264 Class<Handle<E,P,C,Long>[]> inputType,
265 JoinEventHandler<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handler,
266 int[][] indices) {
267
268 super(inputCollectors, inputType, false);
269 this.ic = inputCollectors;
270 this.indices = indices;
271 this.handler = handler;
272 }
273
274 @Override
275 protected void startJoin() {
276 // System.out.println("Lock by "+Thread.currentThread());
277 lock.lock();
278 }
279
280 @Override
281 protected void endJoin() {
282 // System.out.println("UnLock by "+Thread.currentThread());
283 lock.unlock();
284 }
285
286 @Override
287 protected Object join(
288 final Handle<E,P,C,Long>[][] inputs,
289 final CompositeContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> context,
290 final Joiner.InputConsumer consumer,
291 int activator) throws Exception {
292
293 EventDispatchJoinContext<E,P,C,AbstractEventBus.Handle<E,P,C,Long>,S> edjc = new EventDispatchJoinContextFilter<E,P,C,AbstractEventBus.Handle<E,P,C,Long>,S>(context.getEventDispatchContext()) {
294
295 @Override
296 public void consumeJoin(int index) {
297 consumer.consume(index);
298 }
299
300 @Override
301 public void consumeJoin(E event) {
302 throw new UnsupportedOperationException("Use consumeJoin(int) instead.");
303 }
304 };
305
306 handler.post(edjc, context.getInferenceContext(), inputs);
307 return null;
308 }
309
310 protected boolean partialJoin(
311 AbstractEventBus.Handle<E,P,C,Long>[][] inputs,
312 int index,
313 InputConsumer consumer) throws Exception {
314
315 for (int i=0; i<inputs[index].length; ++i) {
316 if (!inputs[index][i].isValid()) {
317 consumer.consume(index);
318 return false;
319 }
320 }
321 return true;
322 }
323
324 @Override
325 protected void takeSnapshot(
326 Long joinNodeId,
327 com.hammurapi.eventbus.AbstractEventBus.Snapshot<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> snapshot,
328 Set<Long> taken) {
329 for (int i=0; i<ic.length; ++i) {
330 List<Long[]> elements = new ArrayList<Long[]>();
331 Z: for (Handle<E,P,C,Long>[] ha: ic[i]) {
332 for (Handle<E,P,C,Long> h: ha) {
333 if (!h.isValid()) {
334 continue Z;
335 }
336 }
337 Long[] ea = new Long[ha.length];
338 for (int j=0; j<ea.length; ++j) {
339 ea[j] = ha[j].getId();
340 }
341 elements.add(ea);
342 }
343
344 if (snapshot instanceof StateSnapshot) {
345 ((AbstractEventBus.StateSnapshot<E,P,C,Long,AbstractEventBus.Handle<E, P, C, Long>,S>) snapshot).joinInputCollector(joinNodeId, indices[i], elements);
346 }
347 }
348
349 }
350
351 @Override
352 public String toString() {
353 StringBuilder strIndices = new StringBuilder();
354 for (int[] ia: indices) {
355 if (strIndices.length()>0) {
356 strIndices.append(" ");
357 }
358 strIndices.append(Arrays.toString(ia));
359 }
360 return "HandleJoiner [indices=[" + strIndices + "], handler=" + handler + ", ic=" + Arrays.toString(ic) + "]";
361 };
362
363 }
364
365 @SuppressWarnings("unchecked")
366 @Override
367 protected EventBusJoiner createJoiner(JoinEventHandler<E, P, C, Long,AbstractEventBus.Handle<E,P,C,Long>, S> handler, int[][] indices) {
368
369 Collector<Handle<E,P,C,Long>[]>[] inputCollectors = new Collector[handler.getCardinality()];
370 for (int i=0; i<inputCollectors.length; ++i) {
371 inputCollectors[i] = new Joiner.CollectionAdapter<Handle<E,P,C,Long>[]>(new ArrayList<Handle<E,P,C,Long>[]>()) {
372
373 /**
374 * Override to replace handles with facades of appropriate type.
375 */
376 public boolean add(AbstractEventBus.Handle<E,P,C,Long>[] handles) {
377 FacadeHandle[] fHandles = (FacadeHandle[]) Array.newInstance(FacadeHandle.class, handles.length);
378 for (int i=0; i<handles.length; ++i) {
379 if (handles[i] instanceof MasterHandle) {
380 fHandles[i] = new FacadeHandle((MasterHandle) handles[i], ((LocalEventBusBase<E,P,C,S>) getEventBus()).getCollectorHandleStrength());
381 } else if (handles[i] instanceof FacadeHandle) {
382 fHandles[i] = new FacadeHandle(((FacadeHandle) handles[i]).getMaster(), ((LocalEventBusBase<E,P,C,S>) getEventBus()).getCollectorHandleStrength());
383 } else {
384 throw new IllegalArgumentException("Unexpected handle type: "+handles[i].getClass());
385 }
386 }
387 return super.add(fHandles);
388 };
389
390 public boolean remove(AbstractEventBus.Handle<E,P,C,Long>[] obj) {
391 // System.out.println("Before remove: "+this);
392 Iterator<Handle<E, P, C, Long>[]> it = iterator();
393 Z: while (it.hasNext()) {
394 Handle<E, P, C, Long>[] next = it.next();
395 if (obj.length!=next.length) {
396 return false;
397 }
398
399 for (int i=0; i<obj.length; ++i) {
400 if (!obj[i].getId().equals(next[i].getId())) {
401 continue Z;
402 }
403 }
404 it.remove();
405 return true;
406 }
407
408 return false;
409 };
410 };
411 }
412 Class inputClass = Handle[].class;
413 return new HandleJoiner(inputCollectors, inputClass, handler, indices);
414 }
415
416 private AtomicLong handlerCounter = new AtomicLong(-1);
417
418 @Override
419 protected Long nextId() {
420 LocalEventBusBase<E, P, C, S> localEventBusBase = (LocalEventBusBase<E,P,C,S>) getEventBus();
421 return localEventBusBase==null ? handlerCounter.decrementAndGet() : localEventBusBase.nextId();
422 }
423
424 @Override
425 protected TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name) {
426 return ((LocalEventBusBase<E,P,C,S>) getEventBus()).createExecutorService(master, oneOff, name);
427 }
428
429 }