EMMA Coverage Report (generated Thu Jan 20 11:39:44 EST 2011)
[all classes][com.hammurapi.eventbus.local]

COVERAGE SUMMARY FOR SOURCE FILE [LocalPredicateChainingMatcher.java]

nameclass, %method, %block, %line, %
LocalPredicateChainingMatcher.java100% (5/5)86%  (32/37)79%  (676/856)83%  (128.9/156)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class LocalPredicateChainingMatcher$HandleJoiner100% (1/1)86%  (6/7)72%  (151/211)76%  (28/37)
toString (): String 0%   (0/1)0%   (0/54)0%   (0/6)
partialJoin (AbstractEventBus$Handle [][], int, Joiner$InputConsumer): boolean 100% (1/1)79%  (19/24)60%  (3/5)
takeSnapshot (Long, AbstractEventBus$Snapshot, Set): void 100% (1/1)99%  (83/84)92%  (12/13)
LocalPredicateChainingMatcher$HandleJoiner (LocalPredicateChainingMatcher, Jo... 100% (1/1)100% (24/24)100% (6/6)
endJoin (): void 100% (1/1)100% (4/4)100% (2/2)
join (AbstractEventBus$Handle [][], CompositeContext, Joiner$InputConsumer, i... 100% (1/1)100% (17/17)100% (3/3)
startJoin (): void 100% (1/1)100% (4/4)100% (2/2)
     
class LocalPredicateChainingMatcher$1100% (1/1)86%  (12/14)74%  (260/352)81%  (55.2/68)
getMode (): EventHandlerBase$Mode 0%   (0/1)0%   (0/4)0%   (0/1)
toString (): String 0%   (0/1)0%   (0/21)0%   (0/1)
post (EventDispatchContext, InferenceContext, AbstractEventBus$Handle []): void 100% (1/1)73%  (184/251)78%  (37.2/48)
LocalPredicateChainingMatcher$1 (LocalPredicateChainingMatcher, Long, EventHa... 100% (1/1)100% (31/31)100% (5/5)
consumes (): boolean 100% (1/1)100% (4/4)100% (1/1)
getCardinality (): int 100% (1/1)100% (4/4)100% (1/1)
getContext (): Object 100% (1/1)100% (4/4)100% (1/1)
getHandler (): EventHandler 100% (1/1)100% (3/3)100% (1/1)
getPredicate (): Predicate 100% (1/1)100% (3/3)100% (1/1)
getPriority (): Comparable 100% (1/1)100% (4/4)100% (1/1)
getRegistrationKeys (): Set 100% (1/1)100% (3/3)100% (1/1)
isOneOff (): boolean 100% (1/1)100% (4/4)100% (1/1)
reset (): void 100% (1/1)100% (4/4)100% (2/2)
takeSnapshot (AbstractEventBus$Snapshot, Set): void 100% (1/1)100% (12/12)100% (3/3)
     
class LocalPredicateChainingMatcher$HandleJoiner$1100% (1/1)67%  (2/3)75%  (15/20)80%  (4/5)
consumeJoin (Object): void 0%   (0/1)0%   (0/5)0%   (0/1)
LocalPredicateChainingMatcher$HandleJoiner$1 (LocalPredicateChainingMatcher$H... 100% (1/1)100% (10/10)100% (2/2)
consumeJoin (int): void 100% (1/1)100% (5/5)100% (2/2)
     
class LocalPredicateChainingMatcher$2100% (1/1)100% (3/3)86%  (108/126)86%  (18/21)
add (AbstractEventBus$Handle []): boolean 100% (1/1)81%  (61/75)88%  (7/8)
remove (AbstractEventBus$Handle []): boolean 100% (1/1)91%  (40/44)82%  (9/11)
LocalPredicateChainingMatcher$2 (LocalPredicateChainingMatcher, Collection): ... 100% (1/1)100% (7/7)100% (2/2)
     
class LocalPredicateChainingMatcher100% (1/1)90%  (9/10)97%  (142/147)96%  (28.7/30)
access$2 (): Logger 0%   (0/1)0%   (0/2)0%   (0/1)
wrap (EventHandler, Long): EventHandlerWrapper 100% (1/1)88%  (23/26)86%  (6/7)
<static initializer> 100% (1/1)100% (5/5)100% (2/2)
LocalPredicateChainingMatcher (): void 100% (1/1)100% (25/25)100% (6/6)
createExecutorService (ExecutorService, boolean, String): TrackingExecutorSer... 100% (1/1)100% (8/8)100% (1/1)
createJoiner (JoinEventHandler, int [][]): PredicateChainingMatcher$EventBusJ... 100% (1/1)100% (33/33)100% (5/5)
createPredicatedInferenceNode (PredicatedInferenceNode, Predicate, Object): P... 100% (1/1)100% (11/11)100% (1/1)
extractHandlerIds (EventHandlerWrapper): Set 100% (1/1)100% (19/19)100% (5/5)
getLock (): ReadWriteLock 100% (1/1)100% (3/3)100% (1/1)
nextId (): Long 100% (1/1)100% (15/15)100% (2/2)

1package com.hammurapi.eventbus.local;
2 
3import java.lang.reflect.Array;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.Collections;
7import java.util.HashSet;
8import java.util.Iterator;
9import java.util.List;
10import java.util.Set;
11import java.util.concurrent.ExecutorService;
12import java.util.concurrent.atomic.AtomicBoolean;
13import java.util.concurrent.atomic.AtomicLong;
14import java.util.concurrent.locks.Lock;
15import java.util.concurrent.locks.ReadWriteLock;
16import java.util.concurrent.locks.ReentrantLock;
17import java.util.concurrent.locks.ReentrantReadWriteLock;
18import java.util.logging.Level;
19import java.util.logging.Logger;
20 
21import com.hammurapi.common.Joiner;
22import com.hammurapi.common.Joiner.Collector;
23import com.hammurapi.common.concurrent.TrackingExecutorService;
24import com.hammurapi.eventbus.AbstractEventBus;
25import com.hammurapi.eventbus.AbstractEventBus.Handle;
26import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
27import com.hammurapi.eventbus.AbstractEventBus.StateSnapshot;
28import com.hammurapi.eventbus.CompositeContext;
29import com.hammurapi.eventbus.EventDispatchContext;
30import com.hammurapi.eventbus.EventDispatchException;
31import com.hammurapi.eventbus.EventDispatchJoinContext;
32import com.hammurapi.eventbus.EventDispatchJoinContextFilter;
33import com.hammurapi.eventbus.EventHandler;
34import com.hammurapi.eventbus.EventHandlerWrapper;
35import com.hammurapi.eventbus.EventHandlerWrapperFilter;
36import com.hammurapi.eventbus.EventStore;
37import com.hammurapi.eventbus.InferenceCommand;
38import com.hammurapi.eventbus.InferenceContext;
39import com.hammurapi.eventbus.InferencePolicy;
40import com.hammurapi.eventbus.JoinEventHandler;
41import com.hammurapi.eventbus.PredicateChainingMatcher;
42import com.hammurapi.eventbus.PredicatedInferenceNode;
43import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle;
44import com.hammurapi.extract.CompositePredicate;
45import com.hammurapi.extract.Predicate;
46import com.hammurapi.extract.True;
47 
48public class LocalPredicateChainingMatcher<E, P extends Comparable<P>, C, S extends EventStore<E,P,C,AbstractEventBus.Handle<E, P, C, Long>,S>> extends        PredicateChainingMatcher<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> implements LocalMatcher<E,P,C,S> {
49        
50        private static final Logger logger = Logger.getLogger(LocalPredicateChainingMatcher.class.getName());
51                
52        private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
53 
54        public LocalPredicateChainingMatcher() {
55                rootNode = createPredicatedInferenceNode(null, (Predicate<E, C>) True.getInstance(), null);
56                rootNode.setRoot(true);                                
57        }
58        
59        @Override
60        protected PredicatedInferenceNode<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> createPredicatedInferenceNode(PredicatedInferenceNode<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> parent, Predicate<E, C> predicate, C context) {
61                return new LocalPredicatedInferenceNode<E, P, C, S>((LocalPredicatedInferenceNode<E, P, C, S>) parent, this, predicate, context, nextId());                                                
62        }
63        
64        @Override
65        protected ReadWriteLock getLock() {
66                return lock;
67        }
68 
69        @Override
70        protected Set<Long> extractHandlerIds(EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> pHandler) {
71                EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> handler = EventHandlerWrapperFilter.peel(pHandler);
72                if (handler instanceof PredicateChainingMatcher.JoinInputEventHandler) {
73                        JoinInputEventHandler jieh = (JoinInputEventHandler) handler;
74                        return Collections.singleton((long) jieh.getId());
75                }
76                return handler.getRegistrationKeys();
77        }
78        
79        @Override
80        protected EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> wrap(final EventHandler<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S> eventHandler,        final Long registrationKey) {                
81                
82                // Construct cAnd
83                final Predicate<E, C> normalized; 
84                Predicate<E, C> predicate = eventHandler.getPredicate();
85                if (predicate == null) {
86                        normalized = True.getInstance();
87                } else  if (predicate instanceof CompositePredicate) {
88                        normalized = ((CompositePredicate) predicate).normalize();
89                } else {
90                        normalized = predicate;
91                }
92                
93                return new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S>() {
94                        
95                        private Set<Long> regKeys = new HashSet<Long>();
96                        
97                        {
98                                regKeys.add(registrationKey);                                                                
99                        }
100 
101                        @Override
102                        public EventHandler<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S> getHandler() {
103                                return eventHandler;
104                        }
105 
106                        @Override
107                        public Set<Long> getRegistrationKeys() {
108                                return regKeys;
109                        }
110 
111                        @Override
112                        public boolean consumes() {
113                                return eventHandler.consumes();
114                        }
115 
116                        @Override
117                        public int getCardinality() {
118                                return eventHandler.getCardinality();
119                        }
120 
121                        @Override
122                        public C getContext() {
123                                return eventHandler.getContext();
124                        }
125 
126                        @Override
127                        public P getPriority() {
128                                return eventHandler.getPriority();
129                        }
130                        
131                        private AtomicBoolean fired=new AtomicBoolean(false);
132 
133                                                
134                        @SuppressWarnings("unchecked")
135                        @Override
136                        public void post(
137                                        EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, 
138                                        InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>, S> inferenceContext, 
139                                        Handle<E,P,C,Long>... handles) {
140                                if (!isOneOff() || !fired.getAndSet(true)) {
141                                        E[] events = (E[]) Array.newInstance(getEventBus().getEventType(), handles.length);
142                                        for (int i=0; i<handles.length; ++i) {
143                                                if (handles[i].isValid()) {
144                                                        events[i] = handles[i].getEvent();
145                                                } else {
146                                                        return;
147                                                }
148                                        }
149                                        
150                                        if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
151                                                
152//                                                if (inferenceContext.getInferenceCommandsQueue()!=null) {
153//                                                        throw new IllegalArgumentException("Inference commands queue shall be null");
154//                                                }
155 
156                                                inferenceContext = inferenceContext.wrap();
157                                        }
158                                        
159                                        LocalEventDispatchContextImpl<E,P,C,S> dispatchContext;
160                                        
161                                        if (eventHandler.getMode()==Mode.POST || eventHandler.getMode()==Mode.BOTH) {
162                                                if (handles.length==1) {
163                                                        dispatchContext = new LocalEventDispatchContextImpl<E, P, C, S>(
164                                                                        inferenceContext, 
165                                                                        eventHandler, 
166                                                                        registrationKey, 
167                                                                        handles, 
168                                                                        events,
169                                                                        Mode.POST);
170                                                } else {
171                                                        dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
172                                                                        inferenceContext,
173                                                                        eventHandler, 
174                                                                        registrationKey, 
175                                                                        handles, 
176                                                                        events, 
177                                                                        (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context,
178                                                                        Mode.POST);                                                
179                                                }                                        
180                                                
181                                                LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
182                                                
183                                                try {
184                                                        if (((LocalEventBusBase<E,P,C,S>) getEventBus()).isAssertPredicatesBeforePost()) {
185                                                                if (!getPredicate().extract(getContext(), null, events)) {
186                                                                        throw new EventDispatchException("Handler predicate evaluated to false before post: "+getPredicate()+", handler "+eventHandler);
187                                                                }
188                                                        }
189                                                        eventHandler.post(dispatchContext, events);
190                                                        
191                                                        // If no exception - post events
192                                                        if (handles.length==1 && InferencePolicy.AFTER_HANDLER.equals(getEventBus().getInferencePolicy())) {
193                                                                inferenceContext.processInferenceCommands();
194                                                        }                
195                                                } catch (Exception e) {
196                                                        logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
197                                                        if (getEventBus().getExceptionHandler()!=null) {
198                                                                getEventBus().getExceptionHandler().handleException(e);
199                                                        }
200                                                        if (inferenceContext.getRootHandle()!=null) {
201                                                                inferenceContext.getRootHandle().handleException(e);
202                                                        }
203                                                } finally {
204                                                        LocalEventDispatchContextImpl.threadContext.set(null);                                                
205                                                }
206                                                if (isOneOff()) {
207                                                        ((LocalEventBusBase<E,P,C,S>) getEventBus()).removeHandlers(registrationKey);
208                                                }
209                                        }
210                                        
211                                        if (eventHandler.getMode()==Mode.REMOVE || eventHandler.getMode()==Mode.BOTH) {
212                                                RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(eventHandler, registrationKey, handles, getEventBus().getEventType());
213                                                for (Handle<E,P,C,Long> handle: handles) {
214                                                        ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
215                                                }
216                                        }                                        
217                                }
218                        }
219 
220                        @Override
221                        public void reset() {
222                                eventHandler.reset();                                
223                        }
224 
225                        @Override
226                        public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> snapshot, Set<Long> taken) {
227                                if (taken.add(registrationKey)) {
228                                        snapshot.handler(registrationKey, eventHandler);
229                                }
230                        }
231                        
232                        @Override
233                        public String toString() {
234                                return "Event handler wrapper(regKey = "+registrationKey+", handler = "+eventHandler+", predicate= "+getPredicate()+")";
235                        }
236 
237                        @Override
238                        public Predicate<E, C> getPredicate() {
239                                return normalized;
240                        }
241 
242                        @Override
243                        public boolean isOneOff() {
244                                return eventHandler.isOneOff();
245                        }
246 
247                        @Override
248                        public Mode getMode() {
249                                return eventHandler.getMode();
250                        }
251                };
252        }
253 
254        private class HandleJoiner extends EventBusJoiner {
255                
256                Lock lock = new ReentrantLock();
257 
258                private JoinEventHandler<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handler;
259                private Collector<Handle<E,P,C,Long>[]>[] ic;
260                private int[][] indices;
261 
262                public HandleJoiner(
263                                Collector<Handle<E,P,C,Long>[]>[] inputCollectors, 
264                                Class<Handle<E,P,C,Long>[]> inputType, 
265                                JoinEventHandler<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handler, 
266                                int[][] indices) {
267                        
268                        super(inputCollectors, inputType, false);
269                        this.ic = inputCollectors;
270                        this.indices = indices;
271                        this.handler = handler;
272                }
273 
274                @Override
275                protected void startJoin() {
276//                        System.out.println("Lock by "+Thread.currentThread());
277                        lock.lock();                
278                }
279                
280                @Override
281                protected void endJoin() {
282//                        System.out.println("UnLock by "+Thread.currentThread());
283                        lock.unlock();                
284                }
285                
286                @Override
287                protected Object join(
288                                final Handle<E,P,C,Long>[][] inputs, 
289                                final CompositeContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> context, 
290                                final Joiner.InputConsumer consumer, 
291                                int activator) throws Exception {
292                        
293                        EventDispatchJoinContext<E,P,C,AbstractEventBus.Handle<E,P,C,Long>,S> edjc = new EventDispatchJoinContextFilter<E,P,C,AbstractEventBus.Handle<E,P,C,Long>,S>(context.getEventDispatchContext()) {
294 
295                                @Override
296                                public void consumeJoin(int index) {
297                                        consumer.consume(index);
298                                }
299 
300                                @Override
301                                public void consumeJoin(E event) {
302                                        throw new UnsupportedOperationException("Use consumeJoin(int) instead."); 
303                                }
304                        };
305                        
306                        handler.post(edjc, context.getInferenceContext(), inputs);
307                        return null;
308                }
309                
310                protected boolean partialJoin(
311                                AbstractEventBus.Handle<E,P,C,Long>[][] inputs, 
312                                int index, 
313                                InputConsumer consumer) throws Exception {
314                        
315                        for (int i=0; i<inputs[index].length; ++i) {
316                                if (!inputs[index][i].isValid()) {
317                                        consumer.consume(index);
318                                        return false;
319                                }
320                        }
321                        return true;
322                }
323 
324                @Override
325                protected void takeSnapshot(
326                                Long joinNodeId,
327                                com.hammurapi.eventbus.AbstractEventBus.Snapshot<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S> snapshot,
328                                Set<Long> taken) {
329                        for (int i=0; i<ic.length; ++i) {
330                                List<Long[]> elements = new ArrayList<Long[]>();
331                                Z: for (Handle<E,P,C,Long>[] ha: ic[i]) {
332                                        for (Handle<E,P,C,Long> h: ha) {
333                                                if (!h.isValid()) {
334                                                        continue Z;
335                                                }
336                                        }
337                                        Long[] ea = new Long[ha.length];
338                                        for (int j=0; j<ea.length; ++j) {
339                                                ea[j] = ha[j].getId();
340                                        }
341                                        elements.add(ea);
342                                }
343                                
344                                if (snapshot instanceof StateSnapshot) {
345                                        ((AbstractEventBus.StateSnapshot<E,P,C,Long,AbstractEventBus.Handle<E, P, C, Long>,S>) snapshot).joinInputCollector(joinNodeId, indices[i], elements);
346                                }
347                        }
348                        
349                }
350 
351                @Override
352                public String toString() {
353                        StringBuilder strIndices = new StringBuilder();
354                        for (int[] ia: indices) {
355                                if (strIndices.length()>0) {
356                                        strIndices.append(" ");
357                                }
358                                strIndices.append(Arrays.toString(ia));
359                        }
360                        return "HandleJoiner [indices=[" + strIndices + "], handler=" + handler + ", ic=" + Arrays.toString(ic) + "]";
361                };
362                
363        }        
364 
365        @SuppressWarnings("unchecked")
366        @Override
367        protected EventBusJoiner createJoiner(JoinEventHandler<E, P, C, Long,AbstractEventBus.Handle<E,P,C,Long>, S> handler, int[][] indices) {
368                
369                Collector<Handle<E,P,C,Long>[]>[] inputCollectors = new Collector[handler.getCardinality()];
370                for (int i=0; i<inputCollectors.length; ++i) {
371                        inputCollectors[i] = new Joiner.CollectionAdapter<Handle<E,P,C,Long>[]>(new ArrayList<Handle<E,P,C,Long>[]>()) {
372                                
373                                /**
374                                 * Override to replace handles with facades of appropriate type.
375                                 */
376                                public boolean add(AbstractEventBus.Handle<E,P,C,Long>[] handles) {
377                                        FacadeHandle[] fHandles = (FacadeHandle[]) Array.newInstance(FacadeHandle.class, handles.length);
378                                        for (int i=0; i<handles.length; ++i) {
379                                                if (handles[i] instanceof MasterHandle) {
380                                                        fHandles[i] = new FacadeHandle((MasterHandle) handles[i], ((LocalEventBusBase<E,P,C,S>) getEventBus()).getCollectorHandleStrength());
381                                                } else if (handles[i] instanceof FacadeHandle) {
382                                                        fHandles[i] = new FacadeHandle(((FacadeHandle) handles[i]).getMaster(), ((LocalEventBusBase<E,P,C,S>) getEventBus()).getCollectorHandleStrength());
383                                                } else {
384                                                        throw new IllegalArgumentException("Unexpected handle type: "+handles[i].getClass());
385                                                }
386                                        }
387                                        return super.add(fHandles);
388                                };
389                                
390                                public boolean remove(AbstractEventBus.Handle<E,P,C,Long>[] obj) {
391//                                        System.out.println("Before remove: "+this);
392                                        Iterator<Handle<E, P, C, Long>[]> it = iterator();
393                                        Z: while (it.hasNext()) {
394                                                Handle<E, P, C, Long>[] next = it.next();
395                                                if (obj.length!=next.length) {
396                                                        return false;
397                                                }
398                                                
399                                                for (int i=0; i<obj.length; ++i) {
400                                                        if (!obj[i].getId().equals(next[i].getId())) {
401                                                                continue Z;
402                                                        }
403                                                }
404                                                it.remove();
405                                                return true;
406                                        }
407                                        
408                                        return false;
409                                };
410                        };
411                }
412                Class inputClass = Handle[].class;
413                return new HandleJoiner(inputCollectors, inputClass, handler, indices);                
414        }
415        
416        private AtomicLong handlerCounter = new AtomicLong(-1);
417 
418        @Override
419        protected Long nextId() {
420                LocalEventBusBase<E, P, C, S> localEventBusBase = (LocalEventBusBase<E,P,C,S>) getEventBus();
421                return localEventBusBase==null ? handlerCounter.decrementAndGet() : localEventBusBase.nextId();
422        }
423 
424        @Override
425        protected TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name) {
426                return ((LocalEventBusBase<E,P,C,S>) getEventBus()).createExecutorService(master, oneOff, name);
427        }
428 
429}

[all classes][com.hammurapi.eventbus.local]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov