EMMA Coverage Report (generated Thu Jan 20 11:39:44 EST 2011)
[all classes][com.hammurapi.eventbus.local]

COVERAGE SUMMARY FOR SOURCE FILE [MasterHandle.java]

nameclass, %method, %block, %line, %
MasterHandle.java100% (4/4)73%  (36/49)75%  (612/815)75%  (131.7/176)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class MasterHandle$3100% (1/1)55%  (6/11)51%  (50/98)48%  (11/23)
getInferenceChainLength (): int 0%   (0/1)0%   (0/2)0%   (0/1)
getRootHandle (): ExceptionHandler 0%   (0/1)0%   (0/3)0%   (0/1)
processInferenceCommands (): void 0%   (0/1)0%   (0/19)0%   (0/5)
setRootHandle (ExceptionHandler): void 0%   (0/1)0%   (0/18)0%   (0/4)
wrap (): InferenceContext 0%   (0/1)0%   (0/6)0%   (0/1)
MasterHandle$3 (MasterHandle, TrackingExecutorService, Queue): void 100% (1/1)100% (15/15)100% (3/3)
createNext (): InferenceContext 100% (1/1)100% (6/6)100% (1/1)
getBus (): EventBus 100% (1/1)100% (4/4)100% (1/1)
getExecutorService (): TrackingExecutorService 100% (1/1)100% (3/3)100% (1/1)
getInferenceCommandsQueue (): Queue 100% (1/1)100% (3/3)100% (1/1)
postInferenceCommand (InferenceCommand): void 100% (1/1)100% (19/19)100% (4/4)
     
class MasterHandle$2100% (1/1)64%  (7/11)64%  (63/98)61%  (14/23)
getInferenceChainLength (): int 0%   (0/1)0%   (0/2)0%   (0/1)
getRootHandle (): ExceptionHandler 0%   (0/1)0%   (0/3)0%   (0/1)
processInferenceCommands (): void 0%   (0/1)0%   (0/19)0%   (0/5)
wrap (): InferenceContext 0%   (0/1)0%   (0/6)0%   (0/1)
setRootHandle (ExceptionHandler): void 100% (1/1)72%  (13/18)75%  (3/4)
MasterHandle$2 (MasterHandle, TrackingExecutorService, Queue): void 100% (1/1)100% (15/15)100% (3/3)
createNext (): InferenceContext 100% (1/1)100% (6/6)100% (1/1)
getBus (): EventBus 100% (1/1)100% (4/4)100% (1/1)
getExecutorService (): TrackingExecutorService 100% (1/1)100% (3/3)100% (1/1)
getInferenceCommandsQueue (): Queue 100% (1/1)100% (3/3)100% (1/1)
postInferenceCommand (InferenceCommand): void 100% (1/1)100% (19/19)100% (4/4)
     
class MasterHandle100% (1/1)84%  (21/25)80%  (489/609)82%  (107.7/131)
getValidators (): Predicate [] 0%   (0/1)0%   (0/3)0%   (0/1)
isDerivedFrom (Object): boolean 0%   (0/1)0%   (0/20)0%   (0/4)
join (long): boolean 0%   (0/1)0%   (0/9)0%   (0/3)
setDirectPost (): void 0%   (0/1)0%   (0/4)0%   (0/2)
checkExceptions (): void 100% (1/1)56%  (25/45)64%  (7/11)
remove (): void 100% (1/1)71%  (51/72)75%  (12/16)
update (): void 100% (1/1)75%  (63/84)78%  (14/18)
addRemoveListener (RemoveListener): void 100% (1/1)82%  (14/17)94%  (2.8/3)
getDerivations (): Collection 100% (1/1)82%  (14/17)91%  (1.8/2)
removeDerivation (Derivation, InferenceContext): void 100% (1/1)88%  (30/34)95%  (4.8/5)
addDerivation (Long, EventHandler, AbstractEventBus$Handle []): void 100% (1/1)89%  (24/27)98%  (4.9/5)
isValid (): boolean 100% (1/1)90%  (26/29)96%  (5.8/6)
remove (boolean, InferenceContext): void 100% (1/1)93%  (77/83)98%  (15.7/16)
<static initializer> 100% (1/1)100% (5/5)100% (2/2)
MasterHandle (LocalEventBusBase, Object, TrackingExecutorService, Long, boole... 100% (1/1)100% (64/64)100% (17/17)
access$2 (MasterHandle): LocalEventBusBase 100% (1/1)100% (3/3)100% (1/1)
addFacade (FacadeHandle): void 100% (1/1)100% (6/6)100% (2/2)
getEvent (): Object 100% (1/1)100% (21/21)100% (3/3)
getId (): Long 100% (1/1)100% (3/3)100% (1/1)
handleException (Exception): void 100% (1/1)100% (6/6)100% (2/2)
isDirectPost (): boolean 100% (1/1)100% (3/3)100% (1/1)
join (): void 100% (1/1)100% (9/9)100% (4/4)
setStoreHandle (Store$Handle): void 100% (1/1)100% (12/12)100% (4/4)
toObservable (Object): Observable 100% (1/1)100% (12/12)100% (1/1)
toString (): String 100% (1/1)100% (21/21)100% (1/1)
     
class MasterHandle$1100% (1/1)100% (2/2)100% (10/10)100% (4/4)
MasterHandle$1 (MasterHandle): void 100% (1/1)100% (6/6)100% (2/2)
update (Object, Object []): void 100% (1/1)100% (4/4)100% (2/2)

1package com.hammurapi.eventbus.local;
2 
3import java.lang.ref.Reference;
4import java.lang.ref.WeakReference;
5import java.util.ArrayList;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.HashSet;
9import java.util.Iterator;
10import java.util.LinkedList;
11import java.util.Queue;
12import java.util.concurrent.ConcurrentLinkedQueue;
13import java.util.logging.Level;
14import java.util.logging.Logger;
15 
16import com.hammurapi.common.ExceptionHandler;
17import com.hammurapi.common.Observable;
18import com.hammurapi.common.Observer;
19import com.hammurapi.common.concurrent.TrackingExecutorService;
20import com.hammurapi.eventbus.AbstractEventBus;
21import com.hammurapi.eventbus.AbstractEventBus.Handle;
22import com.hammurapi.eventbus.Derivation;
23import com.hammurapi.eventbus.EventBus;
24import com.hammurapi.eventbus.EventDispatchException;
25import com.hammurapi.eventbus.EventDispatchMultiCauseException;
26import com.hammurapi.eventbus.EventHandler;
27import com.hammurapi.eventbus.EventStore;
28import com.hammurapi.eventbus.InferenceCommand;
29import com.hammurapi.eventbus.InferenceContext;
30import com.hammurapi.eventbus.InferencePolicy;
31import com.hammurapi.eventbus.PostCommand;
32import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle;
33import com.hammurapi.extract.Predicate;
34import com.hammurapi.store.Store;
35 
36class MasterHandle<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> implements LocalHandle<E,P,C,S> {                
37        private Long id;
38        private Collection<Derivation<E,P,C>> derivations = new HashSet<Derivation<E,P,C>>(); // Derivations of this handle
39        private Collection<FacadeHandle<E,P,C,S>> facades = new ConcurrentLinkedQueue<FacadeHandle<E,P,C,S>>();
40        private TrackingExecutorService joinDelegate;
41        private Store.Handle<AbstractEventBus.Handle<E, P, C, Long>, E, S> storeHandle;
42        private LocalEventBusBase<E, P, C, S> bus;
43        private E event;
44        boolean directPost;
45        private Predicate<E, S>[] validators;        
46        private Collection<RemoveListener<E,P,C,S>> removeListeners = new ArrayList<RemoveListener<E,P,C,S>>();
47        
48        private static final Logger logger = Logger.getLogger(MasterHandle.class.getName());
49                
50//        private ReadWriteLock handleLock = new ReentrantReadWriteLock();
51        
52        private final Observer<E> observer = new Observer<E>() {
53 
54                @Override
55                public void update(E obj, Object... args) {
56                        MasterHandle.this.update();
57                }
58                
59        };
60 
61        void addFacade(FacadeHandle<E,P,C,S> facade) {
62                facades.add(facade);
63        }
64        
65        private Reference<Observable<E>> observableRef;
66        
67        MasterHandle(
68                        LocalEventBusBase<E, P, C, S> bus, 
69                        E event, 
70                        TrackingExecutorService joinDelegate, 
71                        Long id, 
72                        boolean directPost,
73                        Predicate<E, S>[] validators) {
74                
75                this.event = event; // Temporarily until setStoreHandle is invoked if store has primary key extractor.
76                this.id = id;
77                this.joinDelegate = joinDelegate;
78                this.bus = bus;
79                this.directPost = directPost;
80                this.validators = validators;
81                
82                com.hammurapi.common.Observable<E> observable = toObservable(event);
83                if (observable!=null) {
84                        observable.addObserver(observer);
85                        observableRef = new WeakReference<Observable<E>>(observable);
86                }
87                
88        }
89        
90        @Override
91        public Long getId() {
92                return id;
93        }
94 
95        @Override
96        public Collection<Derivation<E,P,C>> getDerivations() {
97                synchronized (derivations) {
98                        return Collections.unmodifiableSet(new HashSet<Derivation<E,P,C>>(derivations));
99                }
100        }
101        
102        public void addDerivation(Long handlerId, EventHandler<E,P,C,?,?> handler, AbstractEventBus.Handle<E,P,C,Long>[] inputs) {
103                synchronized (derivations) {
104                        DerivationImpl<E,P,C,S> derivation = new DerivationImpl<E,P,C,S>(handlerId, handler, inputs);
105                        derivations.add(derivation);
106                        derivation.setMasterHandle(this);
107                }
108        }
109 
110        @Override
111        public E getEvent() {
112                if (storeHandle==null || bus.getStore().getPrimaryKeyExtractor()==null) {
113                        return event;
114                }
115                return storeHandle.isValid() ? storeHandle.getPrimaryKey() : null;
116        }                                
117 
118        @Override
119        public void remove() {
120                try {
121                        final Queue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>> inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>>(); 
122                        final TrackingExecutorService hes = bus.createExecutorService(false, "Remove executor");
123                        
124                        InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext = new InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S>() {
125 
126                                @Override
127                                public int getInferenceChainLength() {
128                                        return 0;
129                                }
130 
131                                @Override
132                                public TrackingExecutorService getExecutorService() {
133                                        return hes;
134                                }
135                                
136                                private ExceptionHandler rootHandle = MasterHandle.this;
137 
138                                @Override
139                                public ExceptionHandler getRootHandle() {
140                                        return rootHandle;
141                                }                                                                
142 
143                                @Override
144                                public void setRootHandle(ExceptionHandler rootHandle) {
145                                        if (rootHandle!=this.rootHandle && this.rootHandle!=MasterHandle.this) {
146                                                throw new IllegalStateException("Root handle already set");
147                                        }
148                                        this.rootHandle = rootHandle;
149                                }
150 
151                                @Override
152                                public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
153                                        return MasterHandle.this.bus;
154                                }
155 
156                                @Override
157                                public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createNext() {
158                                        return bus.createInferenceContext(this);
159                                }
160 
161                                @Override
162                                public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> wrap() {
163                                        return bus.wrapInferenceContext(this);
164                                }
165 
166                                @Override
167                                public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
168                                        if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
169                                                MasterHandle.this.bus.processInferenceCommand(command);
170                                        } else {
171                                                inferenceCommandsQueue.add(command);
172                                        }
173                                }
174 
175                                @Override
176                                public void processInferenceCommands() {
177                                        Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
178                                        while (cit.hasNext()) {
179                                                MasterHandle.this.bus.processInferenceCommand(cit.next());
180                                                cit.remove();
181                                        }
182                                }
183 
184                                @Override
185                                public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
186                                        return inferenceCommandsQueue;
187                                }
188                                
189                        };
190                        
191                        remove(false, inferenceContext);
192                        if (hes!=null) {
193                                hes.join();
194                        }
195                        
196                        while (!inferenceCommandsQueue.isEmpty()) {
197                                for (InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
198                                        bus.processInferenceCommand(conclusion);
199                                }
200                                if (hes!=null) {
201                                        hes.join();
202                                }
203                        }
204                } catch (Exception e) {
205                        logger.log(Level.SEVERE, "Remove error: "+e, e);
206                        if (bus.getExceptionHandler()!=null) {
207                                bus.getExceptionHandler().handleException(e);
208                        }
209                }
210        }
211        
212        /**
213         * When derivation is invalidated, it invokes this method to notify the handle.
214         * If handle is not direct post (derived) and there are no good derivations - remove handle.
215         * @param derivation
216         */
217        void removeDerivation(Derivation<E,P,C> derivation, InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext) {
218                boolean toRemove;
219                synchronized (derivations) {
220                        toRemove = derivations.remove(derivation) && !directPost && derivations.isEmpty();
221                }
222                
223                if (toRemove) {
224                        remove(false, inferenceContext);
225                }
226        }
227 
228        @Override
229        public boolean isDerivedFrom(E event) {
230                for (Derivation<E,P,C> dd: getDerivations()) {
231                        if (dd.isDerivedFrom(event)) {
232                                return true;
233                        }
234                }
235                return false;
236        }
237 
238        @Override
239        public boolean isValid() {
240                if (!storeHandle.isValid()) {                        
241                        return false;
242                }
243                
244                if (directPost) {
245                        return true;
246                }
247 
248                synchronized (derivations) {
249                        return !derivations.isEmpty(); // When derivation is invalidated it is removed from the derivation list.
250                }
251        }
252 
253        @Override
254        public void update() {
255                try {
256                        final Queue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>> inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>>(); 
257                        final TrackingExecutorService hes = bus.createExecutorService(false, "Remove executor");
258                        
259                        InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext = new InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S>() {
260 
261                                @Override
262                                public int getInferenceChainLength() {
263                                        return 0;
264                                }
265 
266                                @Override
267                                public TrackingExecutorService getExecutorService() {
268                                        return hes;
269                                }
270                                
271                                private ExceptionHandler rootHandle = MasterHandle.this;
272 
273                                @Override
274                                public ExceptionHandler getRootHandle() {
275                                        return rootHandle;
276                                }                                                                
277 
278                                @Override
279                                public void setRootHandle(ExceptionHandler rootHandle) {
280                                        if (rootHandle!=this.rootHandle && this.rootHandle!=MasterHandle.this) {
281                                                throw new IllegalStateException("Root handle already set");
282                                        }
283                                        this.rootHandle = rootHandle;
284                                }
285 
286                                @Override
287                                public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
288                                        return MasterHandle.this.bus;
289                                }
290 
291                                @Override
292                                public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createNext() {
293                                        return bus.createInferenceContext(this);
294                                }
295 
296                                @Override
297                                public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> wrap() {
298                                        return bus.wrapInferenceContext(this);
299                                }
300 
301                                @Override
302                                public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
303                                        if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
304                                                MasterHandle.this.bus.processInferenceCommand(command);
305                                        } else {
306                                                inferenceCommandsQueue.add(command);
307                                        }
308                                }
309 
310                                @Override
311                                public void processInferenceCommands() {
312                                        Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
313                                        while (cit.hasNext()) {
314                                                MasterHandle.this.bus.processInferenceCommand(cit.next());
315                                                cit.remove();
316                                        }
317                                }
318 
319                                @Override
320                                public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
321                                        return inferenceCommandsQueue;
322                                }
323                                
324                        };
325                        
326                        remove(true, inferenceContext);
327                        
328                        InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> postCommand = new PostCommand<E, P, C, Long, Handle<E, P, C, Long>, S>(this, null, null, null, inferenceContext);
329                        inferenceContext.postInferenceCommand(postCommand);                        
330                        
331                        if (hes!=null) {
332                                hes.join();
333                        }
334                        
335                        while (!inferenceCommandsQueue.isEmpty()) {
336                                for (InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
337                                        bus.processInferenceCommand(conclusion);
338                                }
339                                if (hes!=null) {
340                                        hes.join();
341                                }
342                        }
343                } catch (Exception e) {
344                        logger.log(Level.SEVERE, "Remove error: "+e, e);
345                        if (bus.getExceptionHandler()!=null) {
346                                bus.getExceptionHandler().handleException(e);
347                        }
348                }                
349        }
350 
351        @Override
352        public void join() throws InterruptedException {
353                if (joinDelegate!=null) {
354                        joinDelegate.join();
355                }
356                checkExceptions();
357        }
358 
359        private void checkExceptions() {
360                if (exceptionCollector.isEmpty()) {
361                        return;
362                }
363                
364                if (exceptionCollector.size()==1) {
365                        Exception e = exceptionCollector.iterator().next();
366                        exceptionCollector.clear();
367                        if (e instanceof EventDispatchException) {
368                                throw (EventDispatchException) e;
369                        }
370                        throw new EventDispatchException(e);
371                }
372                
373                Collection<Exception> ce = Collections.unmodifiableCollection(new ArrayList<Exception>(exceptionCollector));
374                exceptionCollector.clear();
375                throw new EventDispatchMultiCauseException(ce);
376        }
377 
378        @Override
379        public boolean join(long timeout) throws InterruptedException {
380                boolean ret = joinDelegate.join(timeout);
381                checkExceptions();
382                return ret;
383        }
384        
385        @Override
386        public <H extends AbstractEventBus.Handle<E,P,C,Long>, SS extends EventStore<E, P, C, H, SS>> void setStoreHandle(Store.Handle<H, E, SS> storeHandle) {
387                this.storeHandle = (Store.Handle<Handle<E, P, C, Long>, E, S>) storeHandle;                
388                if (bus.getStore().getPrimaryKeyExtractor()!=null) {
389                        this.event = null;
390                }
391        }
392 
393        Predicate<E, S>[] getValidators() {
394                return validators;
395        }
396        
397        private Observable<E> toObservable(E obj) {
398                return bus.observableConverter==null ? null : bus.observableConverter.convert(obj);
399        }
400 
401        @Override
402        public String toString() {
403                return "MasterHandle [getId()=" + getId() + ", getEvent()="        + getEvent() + ", isValid()=" + isValid() + "]";
404        }
405        
406        private Collection<Exception> exceptionCollector = Collections.synchronizedCollection(new LinkedList<Exception>());
407 
408        @Override
409        public void handleException(Exception e) {
410                exceptionCollector.add(e);
411        }
412 
413        public boolean isDirectPost() {
414                return directPost;
415        }
416        
417        @Override
418        public void setDirectPost() {
419                directPost = true;
420        }
421        
422        public void addRemoveListener(RemoveListener<E, P, C, S> removeListener) {
423                synchronized (removeListeners) {
424                        removeListeners.add(removeListener);
425                }                                
426        }
427        
428        @Override
429        public void remove(boolean forUpdate, InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> inferenceContext) {
430                
431                // Invalidate all derivatives and facades and clears all derivations.
432                for (FacadeHandle<E,P,C,S> fh: facades) {
433                        fh.invalidateFacade(inferenceContext);
434                }
435                facades = new ConcurrentLinkedQueue<FacadeHandle<E,P,C,S>>();
436                
437                synchronized (derivations) {
438                        derivations.clear();
439                }
440                
441                // Fire remove handlers.
442                synchronized (removeListeners) {
443                        for (RemoveListener<E,P,C,S> rl: removeListeners) {
444                                rl.postRetractCommand(inferenceContext);
445                        }
446                        removeListeners.clear();
447                }                                
448                
449                // Remove observer if not part of update
450                if (!forUpdate) {
451                        if (observableRef!=null) {
452                                com.hammurapi.common.Observable<E> observable = observableRef.get();
453                                if (observable!=null) {
454                                        observable.deleteObserver(observer);
455                                }
456                        }
457                        storeHandle.remove();                        
458                }
459        }                
460        
461}

[all classes][com.hammurapi.eventbus.local]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov