EMMA Coverage Report (generated Thu Jan 20 11:39:44 EST 2011)
[all classes][com.hammurapi.eventbus.local]

COVERAGE SUMMARY FOR SOURCE FILE [LocalSimpleMatcher.java]

nameclass, %method, %block, %line, %
LocalSimpleMatcher.java100% (7/7)74%  (40/54)74%  (914/1235)72%  (169.7/237)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class LocalSimpleMatcher$4100% (1/1)100% (3/3)58%  (43/74)69%  (9/13)
add (AbstractEventBus$Handle): boolean 100% (1/1)36%  (16/45)40%  (2/5)
remove (AbstractEventBus$Handle): boolean 100% (1/1)91%  (20/22)83%  (5/6)
LocalSimpleMatcher$4 (LocalSimpleMatcher, Collection): void 100% (1/1)100% (7/7)100% (2/2)
     
class LocalSimpleMatcher$3100% (1/1)100% (2/2)62%  (20/32)78%  (7/9)
compare (EventHandlerWrapper, EventHandlerWrapper): int 100% (1/1)54%  (14/26)71%  (5/7)
LocalSimpleMatcher$3 (LocalSimpleMatcher): void 100% (1/1)100% (6/6)100% (2/2)
     
class LocalSimpleMatcher$HandleJoiner100% (1/1)80%  (4/5)63%  (183/289)73%  (36.7/50)
toString (): String 0%   (0/1)0%   (0/17)0%   (0/1)
join (AbstractEventBus$Handle [], CompositeContext, Joiner$InputConsumer, int... 100% (1/1)62%  (146/235)68%  (25.7/38)
LocalSimpleMatcher$HandleJoiner (LocalSimpleMatcher, Joiner$Collector [], Cla... 100% (1/1)100% (29/29)100% (7/7)
endJoin (): void 100% (1/1)100% (4/4)100% (2/2)
startJoin (): void 100% (1/1)100% (4/4)100% (2/2)
     
class LocalSimpleMatcher$2100% (1/1)46%  (6/13)70%  (103/147)62%  (19.9/32)
getCardinality (): int 0%   (0/1)0%   (0/4)0%   (0/1)
getHandler (): EventHandler 0%   (0/1)0%   (0/3)0%   (0/1)
getMode (): EventHandlerBase$Mode 0%   (0/1)0%   (0/4)0%   (0/1)
getRegistrationKeys (): Set 0%   (0/1)0%   (0/3)0%   (0/1)
isOneOff (): boolean 0%   (0/1)0%   (0/4)0%   (0/1)
reset (): void 0%   (0/1)0%   (0/4)0%   (0/2)
takeSnapshot (AbstractEventBus$Snapshot, Set): void 0%   (0/1)0%   (0/1)0%   (0/1)
post (EventDispatchContext, InferenceContext, AbstractEventBus$Handle []): void 100% (1/1)77%  (71/92)77%  (13.9/18)
LocalSimpleMatcher$2 (LocalSimpleMatcher, EventHandler, LocalSimpleMatcher$Ha... 100% (1/1)100% (18/18)100% (2/2)
consumes (): boolean 100% (1/1)100% (4/4)100% (1/1)
getContext (): Object 100% (1/1)100% (4/4)100% (1/1)
getPredicate (): Predicate 100% (1/1)100% (2/2)100% (1/1)
getPriority (): Comparable 100% (1/1)100% (4/4)100% (1/1)
     
class LocalSimpleMatcher$1100% (1/1)69%  (9/13)79%  (238/303)72%  (45.5/63)
getCardinality (): int 0%   (0/1)0%   (0/4)0%   (0/1)
getHandler (): EventHandler 0%   (0/1)0%   (0/3)0%   (0/1)
getMode (): EventHandlerBase$Mode 0%   (0/1)0%   (0/4)0%   (0/1)
takeSnapshot (AbstractEventBus$Snapshot, Set): void 0%   (0/1)0%   (0/1)0%   (0/1)
post (EventDispatchContext, InferenceContext, AbstractEventBus$Handle []): void 100% (1/1)78%  (190/243)72%  (34.5/48)
LocalSimpleMatcher$1 (LocalSimpleMatcher, EventHandler, Long, Set): void 100% (1/1)100% (21/21)100% (3/3)
consumes (): boolean 100% (1/1)100% (4/4)100% (1/1)
getContext (): Object 100% (1/1)100% (4/4)100% (1/1)
getPredicate (): Predicate 100% (1/1)100% (4/4)100% (1/1)
getPriority (): Comparable 100% (1/1)100% (4/4)100% (1/1)
getRegistrationKeys (): Set 100% (1/1)100% (3/3)100% (1/1)
isOneOff (): boolean 100% (1/1)100% (4/4)100% (1/1)
reset (): void 100% (1/1)100% (4/4)100% (2/2)
     
class LocalSimpleMatcher100% (1/1)87%  (13/15)82%  (284/346)74%  (51.7/70)
manageHandlers (Matcher$HandlerManager): void 0%   (0/1)0%   (0/20)0%   (0/6)
takeSnapshot (AbstractEventBus$Snapshot): void 0%   (0/1)0%   (0/5)0%   (0/1)
nextId (): Long 100% (1/1)73%  (11/15)86%  (1.7/2)
reset (): void 100% (1/1)77%  (24/31)63%  (4.4/7)
addHandler (EventHandler): Long 100% (1/1)84%  (65/77)77%  (11.5/15)
removeHandlers (Iterable): void 100% (1/1)87%  (48/55)78%  (9.4/12)
match (Object, ExecutorService): Iterable 100% (1/1)89%  (58/65)80%  (9.6/12)
<static initializer> 100% (1/1)100% (5/5)100% (2/2)
LocalSimpleMatcher (): void 100% (1/1)100% (19/19)100% (4/4)
access$1 (LocalSimpleMatcher): EventBus 100% (1/1)100% (3/3)100% (1/1)
access$2 (): Logger 100% (1/1)100% (2/2)100% (1/1)
createJoinHandler (EventHandler, Long, Set, LocalSimpleMatcher$HandleJoiner, ... 100% (1/1)100% (9/9)100% (1/1)
createJoiner (EventHandler, Long): LocalSimpleMatcher$HandleJoiner 100% (1/1)100% (33/33)100% (5/5)
getLock (): ReadWriteLock 100% (1/1)100% (3/3)100% (1/1)
setEventBus (EventBus): void 100% (1/1)100% (4/4)100% (2/2)
     
class LocalSimpleMatcher$HandleJoiner$1100% (1/1)100% (3/3)98%  (43/44)99%  (8.9/9)
consumeJoin (Object): void 100% (1/1)95%  (19/20)98%  (4.9/5)
LocalSimpleMatcher$HandleJoiner$1 (LocalSimpleMatcher$HandleJoiner, Inference... 100% (1/1)100% (19/19)100% (2/2)
consumeJoin (int): void 100% (1/1)100% (5/5)100% (2/2)

1package com.hammurapi.eventbus.local;
2 
3import java.lang.reflect.Array;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.Comparator;
9import java.util.Iterator;
10import java.util.List;
11import java.util.Set;
12import java.util.concurrent.ExecutorService;
13import java.util.concurrent.atomic.AtomicBoolean;
14import java.util.concurrent.atomic.AtomicLong;
15import java.util.concurrent.locks.Lock;
16import java.util.concurrent.locks.ReadWriteLock;
17import java.util.concurrent.locks.ReentrantLock;
18import java.util.concurrent.locks.ReentrantReadWriteLock;
19import java.util.logging.Level;
20import java.util.logging.Logger;
21 
22import com.hammurapi.common.Joiner;
23import com.hammurapi.common.Joiner.Collector;
24import com.hammurapi.eventbus.AbstractEventBus;
25import com.hammurapi.eventbus.AbstractEventBus.Handle;
26import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
27import com.hammurapi.eventbus.CompositeContext;
28import com.hammurapi.eventbus.DispatchNetworkException;
29import com.hammurapi.eventbus.EventBus;
30import com.hammurapi.eventbus.EventDispatchContext;
31import com.hammurapi.eventbus.EventDispatchException;
32import com.hammurapi.eventbus.EventDispatchJoinContext;
33import com.hammurapi.eventbus.EventHandler;
34import com.hammurapi.eventbus.EventHandlerBase.Mode;
35import com.hammurapi.eventbus.EventHandlerWrapper;
36import com.hammurapi.eventbus.EventStore;
37import com.hammurapi.eventbus.InferenceCommand;
38import com.hammurapi.eventbus.InferenceContext;
39import com.hammurapi.eventbus.InferencePolicy;
40import com.hammurapi.eventbus.PredicateChainingMatcher;
41import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle;
42import com.hammurapi.extract.Predicate;
43import com.hammurapi.extract.True;
44 
45/**
46 * This matcher does matching in a straightforward way - it evaluates handler predicates sequentially, it does not cache extracted values, does not 
47 * optimize event joins, and does not use executor service. The purpose of this matcher is to help with debugging and troubleshooting of handler/predicate 
48 * logic and to be a baseline of matching performance.
49 * 
50 * This matcher doesn't support snapshots.
51 * 
52 * @author Pavel Vlasov
53 *
54 * @param <E>
55 * @param <P>
56 * @param <C>
57 * @param <K>
58 * @param <H>
59 * @param <S>
60 */
61public class LocalSimpleMatcher<E, P extends Comparable<P>, C, S extends EventStore<E,P,C,AbstractEventBus.Handle<E, P, C, Long>,S>> implements LocalMatcher<E,P,C,S> {
62        private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName());
63        private EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> eventBus;
64 
65        protected Collection<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>> wrappers = new ArrayList<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>>();
66        private ReadWriteLock lock = new ReentrantReadWriteLock(); 
67 
68        /**
69         * Lock for the inference network.
70         * @return
71         */
72        protected ReadWriteLock getLock() {
73                return lock;
74        }
75 
76        private AtomicLong handlerCounter = new AtomicLong(-1);
77 
78        protected Long nextId() {
79                LocalEventBusBase<E, P, C, S> localEventBusBase = (LocalEventBusBase<E,P,C,S>) eventBus;
80                return localEventBusBase==null ? handlerCounter.decrementAndGet() : localEventBusBase.nextId();
81        }
82        
83        @Override
84        public Long addHandler(final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler) {
85                if (eventHandler.getCardinality()<1) {
86                        throw new DispatchNetworkException("Handler cardinality cannot be less than one.");
87                }
88                getLock().writeLock().lock();
89                try {
90                        final Long registrationKey = nextId();
91                        final Set<Long> keySet = Collections.unmodifiableSet(Collections.singleton(registrationKey));                
92                        if (eventHandler.getCardinality()==1) {
93                                EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w = new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() {
94 
95                                        @Override
96                                        public boolean consumes() {
97                                                return eventHandler.consumes();
98                                        }
99 
100                                        @Override
101                                        public P getPriority() {
102                                                return eventHandler.getPriority();
103                                        }
104 
105                                        @Override
106                                        public int getCardinality() {
107                                                return eventHandler.getCardinality();
108                                        }
109 
110                                        @Override
111                                        public void reset() {
112                                                eventHandler.reset();
113                                        }
114 
115                                        @Override
116                                        public C getContext() {
117                                                return eventHandler.getContext();
118                                        }
119 
120                                        @Override
121                                        public boolean isOneOff() {
122                                                return eventHandler.isOneOff();
123                                        }
124 
125                                        @Override
126                                        public Mode getMode() {
127                                                return eventHandler.getMode();
128                                        }
129 
130                                        private AtomicBoolean fired=new AtomicBoolean(false);
131 
132                                        @SuppressWarnings("unchecked")
133                                        @Override
134                                        public void post(
135                                                        EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, 
136                                                        InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext,                                                        
137                                                        Handle<E,P,C,Long>... handles) {
138                                                if (!isOneOff() || !fired.getAndSet(true)) {
139                                                        E[] events = (E[]) Array.newInstance(eventBus.getEventType(), handles.length);
140                                                        for (int i=0; i<handles.length; ++i) {
141                                                                if (handles[i].isValid()) {
142                                                                        events[i] = handles[i].getEvent();
143                                                                } else {
144                                                                        return;
145                                                                }
146                                                        }
147                                                        
148                                                        if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
149                                                                
150//                                                                if (inferenceContext.getInferenceCommandsQueue()!=null) {
151//                                                                        throw new IllegalArgumentException("Inference commands queue shall be null");
152//                                                                }
153 
154                                                                inferenceContext = inferenceContext.wrap();
155                                                        }
156                                                        
157                                                        LocalEventDispatchContextImpl<E,P,C,S> dispatchContext;
158                                                                                                                
159                                                        if (eventHandler.getMode()==Mode.POST || eventHandler.getMode()==Mode.BOTH) {
160                                                                if (handles.length==1) {
161                                                                        dispatchContext = new LocalEventDispatchContextImpl<E, P, C, S>(
162                                                                                        inferenceContext,
163                                                                                        eventHandler, 
164                                                                                        registrationKey, 
165                                                                                        handles, 
166                                                                                        events,
167                                                                                        Mode.POST);
168                                                                } else {
169                                                                        dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
170                                                                                        inferenceContext,
171                                                                                        eventHandler, 
172                                                                                        registrationKey, 
173                                                                                        handles, 
174                                                                                        events, 
175                                                                                        (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context,
176                                                                                        Mode.POST);                                                
177                                                                }                                        
178                                                                
179                                                                LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
180                                                                try {
181                                                                        if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) {
182                                                                                if (!getPredicate().extract(getContext(), null, events)) {
183                                                                                        throw new EventDispatchException("Handler predicate evaluated to false before post: "+getPredicate()+", handler "+eventHandler);
184                                                                                }
185                                                                        }
186                                                                        eventHandler.post(dispatchContext, events);
187                                                                        
188                                                                        // If no exception - post events
189                                                                        if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
190                                                                                inferenceContext.processInferenceCommands();
191                                                                        }                
192                                                                } catch (Exception e) {
193                                                                        logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
194                                                                        if (eventBus.getExceptionHandler()!=null) {
195                                                                                eventBus.getExceptionHandler().handleException(e);
196                                                                        }
197                                                                        if (inferenceContext.getRootHandle()!=null) {
198                                                                                inferenceContext.getRootHandle().handleException(e);
199                                                                        }
200                                                                } finally {
201                                                                        LocalEventDispatchContextImpl.threadContext.set(null);                                                
202                                                                }
203                                                                if (isOneOff()) {
204                                                                        ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey);
205                                                                }
206                                                        }
207                                                        
208                                                        if (eventHandler.getMode()==Mode.REMOVE || eventHandler.getMode()==Mode.BOTH) {
209                                                                RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(eventHandler, registrationKey, handles, eventBus.getEventType());
210                                                                for (Handle<E,P,C,Long> handle: handles) {
211                                                                        ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
212                                                                }
213                                                        }
214                                                }
215                                        }
216 
217                                        @Override
218                                        public Set<Long> getRegistrationKeys() {
219                                                return keySet;
220                                        }
221 
222                                        @Override
223                                        public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() {
224                                                return eventHandler;
225                                        }
226 
227                                        @Override
228                                        public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) {
229                                                // NOP                                                
230                                        }
231 
232                                        @Override
233                                        public Predicate<E, C> getPredicate() {
234                                                return eventHandler.getPredicate();
235                                        }
236                                        
237                                };
238                                wrappers.add(w);
239                        } else {
240                                HandleJoiner joiner = createJoiner(eventHandler, registrationKey);
241                                for (int i=0; i<eventHandler.getCardinality(); ++i) {
242                                        wrappers.add(createJoinHandler(eventHandler, registrationKey, keySet, joiner, i));
243                                }
244                        }
245                        return registrationKey;
246                } finally {
247                        getLock().writeLock().unlock();
248                }
249        }
250 
251        private EventHandlerWrapper<E, P, C, Long, Handle<E, P, C, Long>, S> createJoinHandler(
252                        final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler,
253                        final Long registrationKey,
254                        final Set<Long> keySet,
255                        final HandleJoiner joiner, 
256                        final int idx) {
257                
258                return new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() {
259 
260                        @Override
261                        public boolean consumes() {
262                                return eventHandler.consumes();
263                        }
264 
265                        @Override
266                        public P getPriority() {
267                                return eventHandler.getPriority();
268                        }
269 
270                        @Override
271                        public int getCardinality() {
272                                return eventHandler.getCardinality();
273                        }
274 
275                        @Override
276                        public void reset() {
277                                eventHandler.reset();
278                        }
279 
280                        @Override
281                        public C getContext() {
282                                return eventHandler.getContext();
283                        }
284 
285                        @Override
286                        public boolean isOneOff() {
287                                return eventHandler.isOneOff();
288                        }
289 
290                        @Override
291                        public Mode getMode() {
292                                return eventHandler.getMode();
293                        }
294 
295                        @Override
296                        public void post(
297                                        EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, 
298                                        InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext,                                                        
299                                        Handle<E,P,C,Long>... handles) {
300                                
301                                if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
302                                        
303//                                        if (inferenceContext.getInferenceCommandsQueue()!=null) {
304//                                                throw new IllegalArgumentException("Inference commands queue shall be null");
305//                                        }
306 
307                                        inferenceContext = inferenceContext.wrap();
308                                }
309                                                                
310                                CompositeContext<E, P, C, Long, Handle<E, P, C, Long>, S> compositeContext = new CompositeContext<E, P, C, Long,AbstractEventBus.Handle<E,P,C,Long>, S>(
311                                                context,                                                 
312                                                eventHandler.getContext(), 
313                                                inferenceContext);
314                                
315                                try {
316                                        joiner.addInput(idx, handles[0], compositeContext);
317                                } catch (Exception e) {
318                                        logger.log(Level.SEVERE, "Join problem: "+e, e);
319                                        if (eventBus.getExceptionHandler()!=null) {
320                                                eventBus.getExceptionHandler().handleException(e);
321                                        }
322                                        if (inferenceContext.getRootHandle()!=null) {
323                                                inferenceContext.getRootHandle().handleException(e);
324                                        }
325                                } finally {                                        
326                                        if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
327                                                inferenceContext.processInferenceCommands();
328                                        }                                                        
329                                }
330                                        
331                        }
332 
333                        @Override
334                        public Set<Long> getRegistrationKeys() {
335                                return keySet;
336                        }
337 
338                        @Override
339                        public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() {
340                                return eventHandler;
341                        }
342 
343                        @Override
344                        public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) {
345                                // NOP                                                
346                        }
347 
348                        @Override
349                        public Predicate<E, C> getPredicate() {
350                                /**
351                                 * Always returns true because predicate is evaluated in join().
352                                 */
353                                return True.getInstance();
354                        }
355                        
356                };
357        }
358 
359        @Override
360        public Iterable<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> match(E event, ExecutorService executorService) {                
361                @SuppressWarnings("unchecked")
362                E[] events = (E[]) Array.newInstance(eventBus.getEventType(), 1);
363                events[0] = event;
364                
365                List<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> ret = new ArrayList<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>();
366                getLock().readLock().lock();
367                try {
368                        for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) {
369                                if (w.getPredicate().extract(w.getContext(), null, events)) {
370                                        ret.add(w);
371                                }
372                        }
373                } finally {
374                        getLock().readLock().unlock();
375                }                        
376                
377                Collections.sort(ret, new Comparator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>() {
378 
379                        @Override
380                        public int compare(EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o1, EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o2) {                                
381                                P p1 = o1.getPriority();
382                                P p2 = o2.getPriority();
383                                if (p2==null) {
384                                        return p1==null ? o2.hashCode() - o1.hashCode() : 1; 
385                                }
386                                if (p1==null) {
387                                        return -1;
388                                }
389                                return p2.compareTo(p1);
390                        }
391                        
392                });
393                
394                return ret;
395        }
396 
397        @Override
398        public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot) {
399                // TODO - Implement.
400                throw new UnsupportedOperationException("LocalSimpleMatcher does not support takeSnapshot() operation");
401        }
402 
403        @Override
404        public void removeHandlers(Iterable<Long> keys) {
405                getLock().writeLock().lock();
406                try {
407                        for (Long key: keys) {
408                                Iterator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> it = wrappers.iterator();
409                                Set<Long> rKeys = it.next().getRegistrationKeys();
410                                for (Long rKey: rKeys) {
411                                        if (rKey.equals(key)) {
412                                                it.remove();
413                                                break;
414                                        }
415                                }
416                        }
417                } finally {
418                        getLock().writeLock().unlock();
419                }                
420        }
421 
422        @Override
423        public void reset() {
424                getLock().readLock().lock();
425                try {
426                        for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) {
427                                w.reset();
428                        }
429                } finally {
430                        getLock().readLock().unlock();
431                }                
432        }
433 
434        @Override
435        public void manageHandlers(HandlerManager<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handlerManager) {
436                getLock().writeLock().lock();
437                try {
438                        handlerManager.manageHandlers(this);
439                } finally {
440                        getLock().writeLock().unlock();
441                }
442        }
443 
444        @Override
445        public void setEventBus(EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> bus) {
446                this.eventBus = bus;                
447        }
448        
449        private class HandleJoiner extends Joiner<Handle<E,P,C,Long>, CompositeContext<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>, Object> {
450                
451                Lock lock = new ReentrantLock();
452 
453                private EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler;
454                private Collector<Handle<E,P,C,Long>>[] ic;
455 
456                private Long registrationKey;
457 
458                public HandleJoiner(
459                                Collector<Handle<E,P,C,Long>>[] inputCollectors, 
460                                Class<Handle<E,P,C,Long>> inputType, 
461                                EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler,
462                                Long registrationKey) {
463                        
464                        super(inputCollectors, inputType, false);
465                        this.ic = inputCollectors;
466                        this.handler = handler;
467                        this.registrationKey = registrationKey;
468                }
469 
470                @Override
471                protected void startJoin() {
472//                        System.out.println("Lock by "+Thread.currentThread());
473                        lock.lock();                
474                }
475                
476                @Override
477                protected void endJoin() {
478//                        System.out.println("UnLock by "+Thread.currentThread());
479                        lock.unlock();                
480                }
481                
482                private AtomicBoolean fired=new AtomicBoolean(false);
483                
484                @Override
485                protected Object join(
486                                final Handle<E,P,C,Long>[] inputs, 
487                                final CompositeContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> context, 
488                                final Joiner.InputConsumer consumer, 
489                                int activator) throws Exception {
490                        
491                        if (!handler.isOneOff() || !fired.getAndSet(true)) {                                
492                                final E[] events = (E[]) Array.newInstance(eventBus.getEventType(), inputs.length);
493                                for (int i=0; i<inputs.length; ++i) {
494                                        if (inputs[i].isValid()) {
495                                                events[i] = inputs[i].getEvent();
496                                        } else {
497                                                return null;
498                                        }
499                                }
500                                
501                                if (!handler.getPredicate().extract(handler.getContext(), null, events)) {
502                                        return null;
503                                }
504                                                                
505                                if (handler.getMode()==Mode.POST || handler.getMode()==Mode.BOTH) {
506                                        LocalEventDispatchContextImpl<E,P,C,S> dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
507                                                        context.getInferenceContext(),
508                                                        handler, 
509                                                        registrationKey, 
510                                                        inputs, 
511                                                        events, 
512                                                        (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context.getEventDispatchContext(),
513                                                        Mode.POST) {
514                                                                                        
515                                        @Override
516                                        public void consumeJoin(int index) {
517                                                consumer.consume(index);
518                                        }
519 
520                                        @Override
521                                        public void consumeJoin(E event) {
522                                                for (int i=0; i<events.length; ++i) {
523                                                        if (event==events[i]) {
524                                                                consumeJoin(i);
525                                                                break;
526                                                        }
527                                                }
528                                        }
529                                };
530                                                                
531                                LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
532                                        try {
533                                                if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) {
534                                                        if (!handler.getPredicate().extract(handler.getContext(), null, events)) {
535                                                                throw new EventDispatchException("Handler predicate evaluated to false before post: "+handler.getPredicate()+", handler "+handler);
536                                                        }
537                                                }
538                                                handler.post(dispatchContext, events);
539                                        } catch (Exception e) {
540                                                logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
541                                                if (eventBus.getExceptionHandler()!=null) {
542                                                        eventBus.getExceptionHandler().handleException(e);
543                                                }
544                                                if (context.getInferenceContext().getRootHandle()!=null) {
545                                                        context.getInferenceContext().getRootHandle().handleException(e);
546                                                }
547                                        } finally {
548                                                LocalEventDispatchContextImpl.threadContext.set(null);                                                
549                                        }
550                                        if (handler.isOneOff()) {
551                                                ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey);
552                                                                                                
553                                        }
554                                }
555                                
556                                if (handler.getMode()==Mode.REMOVE || handler.getMode()==Mode.BOTH) {
557                                        RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(handler, registrationKey, inputs, eventBus.getEventType());
558                                        for (Handle<E,P,C,Long> handle: inputs) {
559                                                ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
560                                        }
561                                }
562                                
563                        }
564                        return null;
565                }
566                
567                @Override
568                public String toString() {
569                        return "Simple HandleJoiner [handler=" + handler + ", ic=" + Arrays.toString(ic) + "]";
570                };
571                
572        }                
573        
574        @SuppressWarnings("unchecked")
575        protected HandleJoiner createJoiner(EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler, Long registrationKey) {
576                
577                Collector<Handle<E,P,C,Long>>[] inputCollectors = new Collector[eventHandler.getCardinality()];
578                for (int i=0; i<inputCollectors.length; ++i) {
579                        inputCollectors[i] = new Joiner.CollectionAdapter<Handle<E,P,C,Long>>(new ArrayList<Handle<E,P,C,Long>>() /* createCollection()*/) {
580                                
581                                /**
582                                 * Override to replace handles with facades of appropriate type.
583                                 */
584                                public boolean add(AbstractEventBus.Handle<E,P,C,Long> handle) {
585                                        if (handle instanceof MasterHandle) {
586                                                return super.add(new FacadeHandle((MasterHandle) handle, ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength()));
587                                        } else if (handle instanceof FacadeHandle) {
588                                                return super.add(new FacadeHandle(((FacadeHandle) handle).getMaster(), ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength()));
589                                        } else {
590                                                throw new IllegalArgumentException("Unexpected handle type: "+handle.getClass());
591                                        }
592                                };
593                                
594                                public boolean remove(AbstractEventBus.Handle<E,P,C,Long> obj) {
595//                                        System.out.println("Before remove: "+this);
596                                        Iterator<Handle<E, P, C, Long>> it = iterator();
597                                        while (it.hasNext()) {
598                                                if (obj.getId().equals(it.next().getId())) {
599                                                        it.remove();
600                                                        return true;
601                                                }
602                                        }
603                                        
604                                        return false;
605                                };
606                        };
607                }
608                Class inputClass = Handle.class;
609                return new HandleJoiner(inputCollectors, inputClass, eventHandler, registrationKey);                
610        }
611 
612//        private Collection<Handle<E, P, C, Long>> createCollection() {
613//                return InvocationRecordingProxyFactory.wrap(Collection.class, new ArrayList<Handle<E,P,C,Long>>(), new ProblemListener() {
614//
615//                        @Override
616//                        public void onProblem(Throwable th, List<InvocationRecord> records) {
617//                                Object[] ra = records.toArray();
618//                                for (int i=0; i<ra.length; ++i) {
619//                                        System.out.println(i+" "+ra[i]);
620//                                }
621//                                
622//                                th.printStackTrace();
623//                        }
624//                        
625//                });
626//        }
627        
628        
629}

[all classes][com.hammurapi.eventbus.local]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov