001    package com.hammurapi.eventbus.local;
002    
003    import java.lang.reflect.Array;
004    import java.util.ArrayList;
005    import java.util.Arrays;
006    import java.util.Collections;
007    import java.util.HashSet;
008    import java.util.Iterator;
009    import java.util.List;
010    import java.util.Set;
011    import java.util.concurrent.ExecutorService;
012    import java.util.concurrent.atomic.AtomicBoolean;
013    import java.util.concurrent.atomic.AtomicLong;
014    import java.util.concurrent.locks.Lock;
015    import java.util.concurrent.locks.ReadWriteLock;
016    import java.util.concurrent.locks.ReentrantLock;
017    import java.util.concurrent.locks.ReentrantReadWriteLock;
018    import java.util.logging.Level;
019    import java.util.logging.Logger;
020    
021    import com.hammurapi.common.Joiner;
022    import com.hammurapi.common.Joiner.Collector;
023    import com.hammurapi.common.concurrent.TrackingExecutorService;
024    import com.hammurapi.eventbus.AbstractEventBus;
025    import com.hammurapi.eventbus.AbstractEventBus.Handle;
026    import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
027    import com.hammurapi.eventbus.AbstractEventBus.StateSnapshot;
028    import com.hammurapi.eventbus.CompositeContext;
029    import com.hammurapi.eventbus.EventDispatchContext;
030    import com.hammurapi.eventbus.EventDispatchException;
031    import com.hammurapi.eventbus.EventDispatchJoinContext;
032    import com.hammurapi.eventbus.EventDispatchJoinContextFilter;
033    import com.hammurapi.eventbus.EventHandler;
034    import com.hammurapi.eventbus.EventHandlerWrapper;
035    import com.hammurapi.eventbus.EventHandlerWrapperFilter;
036    import com.hammurapi.eventbus.EventStore;
037    import com.hammurapi.eventbus.InferenceCommand;
038    import com.hammurapi.eventbus.InferenceContext;
039    import com.hammurapi.eventbus.InferencePolicy;
040    import com.hammurapi.eventbus.JoinEventHandler;
041    import com.hammurapi.eventbus.PredicateChainingMatcher;
042    import com.hammurapi.eventbus.PredicatedInferenceNode;
043    import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle;
044    import com.hammurapi.extract.CompositePredicate;
045    import com.hammurapi.extract.Predicate;
046    import com.hammurapi.extract.True;
047    
048    public class LocalPredicateChainingMatcher<E, P extends Comparable<P>, C, S extends EventStore<E,P,C,AbstractEventBus.Handle<E, P, C, Long>,S>> extends PredicateChainingMatcher<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> implements LocalMatcher<E,P,C,S> {
049            
050            private static final Logger logger = Logger.getLogger(LocalPredicateChainingMatcher.class.getName());
051                    
052            private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
053    
054            public LocalPredicateChainingMatcher() {
055                    rootNode = createPredicatedInferenceNode(null, (Predicate<E, C>) True.getInstance(), null);
056                    rootNode.setRoot(true);                         
057            }
058            
059            @Override
060            protected PredicatedInferenceNode<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> createPredicatedInferenceNode(PredicatedInferenceNode<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> parent, Predicate<E, C> predicate, C context) {
061                    return new LocalPredicatedInferenceNode<E, P, C, S>((LocalPredicatedInferenceNode<E, P, C, S>) parent, this, predicate, context, nextId());                                         
062            }
063            
064            @Override
065            protected ReadWriteLock getLock() {
066                    return lock;
067            }
068    
069            @Override
070            protected Set<Long> extractHandlerIds(EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> pHandler) {
071                    EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> handler = EventHandlerWrapperFilter.peel(pHandler);
072                    if (handler instanceof PredicateChainingMatcher.JoinInputEventHandler) {
073                            JoinInputEventHandler jieh = (JoinInputEventHandler) handler;
074                            return Collections.singleton((long) jieh.getId());
075                    }
076                    return handler.getRegistrationKeys();
077            }
078            
079            @Override
080            protected EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> wrap(final EventHandler<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S> eventHandler,     final Long registrationKey) {           
081                    
082                    // Construct cAnd
083                    final Predicate<E, C> normalized; 
084                    Predicate<E, C> predicate = eventHandler.getPredicate();
085                    if (predicate == null) {
086                            normalized = True.getInstance();
087                    } else  if (predicate instanceof CompositePredicate) {
088                            normalized = ((CompositePredicate) predicate).normalize();
089                    } else {
090                            normalized = predicate;
091                    }
092                    
093                    return new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S>() {
094                            
095                            private Set<Long> regKeys = new HashSet<Long>();
096                            
097                            {
098                                    regKeys.add(registrationKey);                                                           
099                            }
100    
101                            @Override
102                            public EventHandler<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S> getHandler() {
103                                    return eventHandler;
104                            }
105    
106                            @Override
107                            public Set<Long> getRegistrationKeys() {
108                                    return regKeys;
109                            }
110    
111                            @Override
112                            public boolean consumes() {
113                                    return eventHandler.consumes();
114                            }
115    
116                            @Override
117                            public int getCardinality() {
118                                    return eventHandler.getCardinality();
119                            }
120    
121                            @Override
122                            public C getContext() {
123                                    return eventHandler.getContext();
124                            }
125    
126                            @Override
127                            public P getPriority() {
128                                    return eventHandler.getPriority();
129                            }
130                            
131                            private AtomicBoolean fired=new AtomicBoolean(false);
132    
133                                                    
134                            @SuppressWarnings("unchecked")
135                            @Override
136                            public void post(
137                                            EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, 
138                                            InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>, S> inferenceContext, 
139                                            Handle<E,P,C,Long>... handles) {
140                                    if (!isOneOff() || !fired.getAndSet(true)) {
141                                            E[] events = (E[]) Array.newInstance(getEventBus().getEventType(), handles.length);
142                                            for (int i=0; i<handles.length; ++i) {
143                                                    if (handles[i].isValid()) {
144                                                            events[i] = handles[i].getEvent();
145                                                    } else {
146                                                            return;
147                                                    }
148                                            }
149                                            
150                                            if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
151                                                    
152    //                                              if (inferenceContext.getInferenceCommandsQueue()!=null) {
153    //                                                      throw new IllegalArgumentException("Inference commands queue shall be null");
154    //                                              }
155    
156                                                    inferenceContext = inferenceContext.wrap();
157                                            }
158                                            
159                                            LocalEventDispatchContextImpl<E,P,C,S> dispatchContext;
160                                            
161                                            if (eventHandler.getMode()==Mode.POST || eventHandler.getMode()==Mode.BOTH) {
162                                                    if (handles.length==1) {
163                                                            dispatchContext = new LocalEventDispatchContextImpl<E, P, C, S>(
164                                                                            inferenceContext, 
165                                                                            eventHandler, 
166                                                                            registrationKey, 
167                                                                            handles, 
168                                                                            events,
169                                                                            Mode.POST);
170                                                    } else {
171                                                            dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
172                                                                            inferenceContext,
173                                                                            eventHandler, 
174                                                                            registrationKey, 
175                                                                            handles, 
176                                                                            events, 
177                                                                            (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context,
178                                                                            Mode.POST);                                             
179                                                    }                                       
180                                                    
181                                                    LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
182                                                    
183                                                    try {
184                                                            if (((LocalEventBusBase<E,P,C,S>) getEventBus()).isAssertPredicatesBeforePost()) {
185                                                                    if (!getPredicate().extract(getContext(), null, events)) {
186                                                                            throw new EventDispatchException("Handler predicate evaluated to false before post: "+getPredicate()+", handler "+eventHandler);
187                                                                    }
188                                                            }
189                                                            eventHandler.post(dispatchContext, events);
190                                                            
191                                                            // If no exception - post events
192                                                            if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
193                                                                    inferenceContext.processInferenceCommands();
194                                                            }               
195                                                    } catch (Exception e) {
196                                                            logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
197                                                            if (getEventBus().getExceptionHandler()!=null) {
198                                                                    getEventBus().getExceptionHandler().handleException(e);
199                                                            }
200                                                            if (inferenceContext.getRootHandle()!=null) {
201                                                                    inferenceContext.getRootHandle().handleException(e);
202                                                            }
203                                                    } finally {
204                                                            LocalEventDispatchContextImpl.threadContext.set(null);                                          
205                                                    }
206                                                    if (isOneOff()) {
207                                                            ((LocalEventBusBase<E,P,C,S>) getEventBus()).removeHandlers(registrationKey);
208                                                    }
209                                            }
210                                            
211                                            if (eventHandler.getMode()==Mode.REMOVE || eventHandler.getMode()==Mode.BOTH) {
212                                                    RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(eventHandler, registrationKey, handles, getEventBus().getEventType());
213                                                    for (Handle<E,P,C,Long> handle: handles) {
214                                                            ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
215                                                    }
216                                            }                                       
217                                    }
218                            }
219    
220                            @Override
221                            public void reset() {
222                                    eventHandler.reset();                           
223                            }
224    
225                            @Override
226                            public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> snapshot, Set<Long> taken) {
227                                    if (taken.add(registrationKey)) {
228                                            snapshot.handler(registrationKey, eventHandler);
229                                    }
230                            }
231                            
232                            @Override
233                            public String toString() {
234                                    return "Event handler wrapper(regKey = "+registrationKey+", handler = "+eventHandler+", predicate= "+getPredicate()+")";
235                            }
236    
237                            @Override
238                            public Predicate<E, C> getPredicate() {
239                                    return normalized;
240                            }
241    
242                            @Override
243                            public boolean isOneOff() {
244                                    return eventHandler.isOneOff();
245                            }
246    
247                            @Override
248                            public Mode getMode() {
249                                    return eventHandler.getMode();
250                            }
251                    };
252            }
253    
254            private class HandleJoiner extends EventBusJoiner {
255                    
256                    Lock lock = new ReentrantLock();
257    
258                    private JoinEventHandler<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handler;
259                    private Collector<Handle<E,P,C,Long>[]>[] ic;
260                    private int[][] indices;
261    
262                    public HandleJoiner(
263                                    Collector<Handle<E,P,C,Long>[]>[] inputCollectors, 
264                                    Class<Handle<E,P,C,Long>[]> inputType, 
265                                    JoinEventHandler<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handler, 
266                                    int[][] indices) {
267                            
268                            super(inputCollectors, inputType, false);
269                            this.ic = inputCollectors;
270                            this.indices = indices;
271                            this.handler = handler;
272                    }
273    
274                    @Override
275                    protected void startJoin() {
276    //                      System.out.println("Lock by "+Thread.currentThread());
277                            lock.lock();            
278                    }
279                    
280                    @Override
281                    protected void endJoin() {
282    //                      System.out.println("UnLock by "+Thread.currentThread());
283                            lock.unlock();          
284                    }
285                    
286                    @Override
287                    protected Object join(
288                                    final Handle<E,P,C,Long>[][] inputs, 
289                                    final CompositeContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> context, 
290                                    final Joiner.InputConsumer consumer, 
291                                    int activator) throws Exception {
292                            
293                            EventDispatchJoinContext<E,P,C,AbstractEventBus.Handle<E,P,C,Long>,S> edjc = new EventDispatchJoinContextFilter<E,P,C,AbstractEventBus.Handle<E,P,C,Long>,S>(context.getEventDispatchContext()) {
294    
295                                    @Override
296                                    public void consumeJoin(int index) {
297                                            consumer.consume(index);
298                                    }
299    
300                                    @Override
301                                    public void consumeJoin(E event) {
302                                            throw new UnsupportedOperationException("Use consumeJoin(int) instead."); 
303                                    }
304                            };
305                            
306                            handler.post(edjc, context.getInferenceContext(), inputs);
307                            return null;
308                    }
309                    
310                    protected boolean partialJoin(
311                                    AbstractEventBus.Handle<E,P,C,Long>[][] inputs, 
312                                    int index, 
313                                    InputConsumer consumer) throws Exception {
314                            
315                            for (int i=0; i<inputs[index].length; ++i) {
316                                    if (!inputs[index][i].isValid()) {
317                                            consumer.consume(index);
318                                            return false;
319                                    }
320                            }
321                            return true;
322                    }
323    
324                    @Override
325                    protected void takeSnapshot(
326                                    Long joinNodeId,
327                                    com.hammurapi.eventbus.AbstractEventBus.Snapshot<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> snapshot,
328                                    Set<Long> taken) {
329                            for (int i=0; i<ic.length; ++i) {
330                                    List<Long[]> elements = new ArrayList<Long[]>();
331                                    Z: for (Handle<E,P,C,Long>[] ha: ic[i]) {
332                                            for (Handle<E,P,C,Long> h: ha) {
333                                                    if (!h.isValid()) {
334                                                            continue Z;
335                                                    }
336                                            }
337                                            Long[] ea = new Long[ha.length];
338                                            for (int j=0; j<ea.length; ++j) {
339                                                    ea[j] = ha[j].getId();
340                                            }
341                                            elements.add(ea);
342                                    }
343                                    
344                                    if (snapshot instanceof StateSnapshot) {
345                                            ((AbstractEventBus.StateSnapshot<E,P,C,Long,AbstractEventBus.Handle<E, P, C, Long>,S>) snapshot).joinInputCollector(joinNodeId, indices[i], elements);
346                                    }
347                            }
348                            
349                    }
350    
351                    @Override
352                    public String toString() {
353                            StringBuilder strIndices = new StringBuilder();
354                            for (int[] ia: indices) {
355                                    if (strIndices.length()>0) {
356                                            strIndices.append(" ");
357                                    }
358                                    strIndices.append(Arrays.toString(ia));
359                            }
360                            return "HandleJoiner [indices=[" + strIndices + "], handler=" + handler + ", ic=" + Arrays.toString(ic) + "]";
361                    };
362                    
363            }       
364    
365            @SuppressWarnings("unchecked")
366            @Override
367            protected EventBusJoiner createJoiner(JoinEventHandler<E, P, C, Long,AbstractEventBus.Handle<E,P,C,Long>, S> handler, int[][] indices) {
368                    
369                    Collector<Handle<E,P,C,Long>[]>[] inputCollectors = new Collector[handler.getCardinality()];
370                    for (int i=0; i<inputCollectors.length; ++i) {
371                            inputCollectors[i] = new Joiner.CollectionAdapter<Handle<E,P,C,Long>[]>(new ArrayList<Handle<E,P,C,Long>[]>()) {
372                                    
373                                    /**
374                                     * Override to replace handles with facades of appropriate type.
375                                     */
376                                    public boolean add(AbstractEventBus.Handle<E,P,C,Long>[] handles) {
377                                            FacadeHandle[] fHandles = (FacadeHandle[]) Array.newInstance(FacadeHandle.class, handles.length);
378                                            for (int i=0; i<handles.length; ++i) {
379                                                    if (handles[i] instanceof MasterHandle) {
380                                                            fHandles[i] = new FacadeHandle((MasterHandle) handles[i], ((LocalEventBusBase<E,P,C,S>) getEventBus()).getCollectorHandleStrength());
381                                                    } else if (handles[i] instanceof FacadeHandle) {
382                                                            fHandles[i] = new FacadeHandle(((FacadeHandle) handles[i]).getMaster(), ((LocalEventBusBase<E,P,C,S>) getEventBus()).getCollectorHandleStrength());
383                                                    } else {
384                                                            throw new IllegalArgumentException("Unexpected handle type: "+handles[i].getClass());
385                                                    }
386                                            }
387                                            return super.add(fHandles);
388                                    };
389                                    
390                                    public boolean remove(AbstractEventBus.Handle<E,P,C,Long>[] obj) {
391    //                                      System.out.println("Before remove: "+this);
392                                            Iterator<Handle<E, P, C, Long>[]> it = iterator();
393                                            Z: while (it.hasNext()) {
394                                                    Handle<E, P, C, Long>[] next = it.next();
395                                                    if (obj.length!=next.length) {
396                                                            return false;
397                                                    }
398                                                    
399                                                    for (int i=0; i<obj.length; ++i) {
400                                                            if (!obj[i].getId().equals(next[i].getId())) {
401                                                                    continue Z;
402                                                            }
403                                                    }
404                                                    it.remove();
405                                                    return true;
406                                            }
407                                            
408                                            return false;
409                                    };
410                            };
411                    }
412                    Class inputClass = Handle[].class;
413                    return new HandleJoiner(inputCollectors, inputClass, handler, indices);         
414            }
415            
416            private AtomicLong handlerCounter = new AtomicLong(-1);
417    
418            @Override
419            protected Long nextId() {
420                    LocalEventBusBase<E, P, C, S> localEventBusBase = (LocalEventBusBase<E,P,C,S>) getEventBus();
421                    return localEventBusBase==null ? handlerCounter.decrementAndGet() : localEventBusBase.nextId();
422            }
423    
424            @Override
425            protected TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name) {
426                    return ((LocalEventBusBase<E,P,C,S>) getEventBus()).createExecutorService(master, oneOff, name);
427            }
428    
429    }