EMMA Coverage Report (generated Thu Jan 20 11:39:44 EST 2011)
[all classes][com.hammurapi.eventbus]

COVERAGE SUMMARY FOR SOURCE FILE [AbstractEventBus.java]

nameclass, %method, %block, %line, %
AbstractEventBus.java50%  (2/4)51%  (21/41)68%  (698/1026)69%  (156.5/227)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class AbstractEventBus$10%   (0/1)0%   (0/6)0%   (0/44)0%   (0/10)
AbstractEventBus$1 (AbstractEventBus, Reference, Predicate): void 0%   (0/1)0%   (0/12)0%   (0/2)
compareTo (Extractor): ComparisonResult 0%   (0/1)0%   (0/2)0%   (0/1)
extract (EventStore, Map, AbstractEventBus$Handle []): Boolean 0%   (0/1)0%   (0/18)0%   (0/4)
getCost (): double 0%   (0/1)0%   (0/4)0%   (0/1)
isContextDependent (): boolean 0%   (0/1)0%   (0/4)0%   (0/1)
parameterIndices (): Set 0%   (0/1)0%   (0/4)0%   (0/1)
     
class AbstractEventBus$CreateMasterHandleResult0%   (0/1)0%   (0/3)0%   (0/18)0%   (0/6)
AbstractEventBus$CreateMasterHandleResult (AbstractEventBus, AbstractEventBus... 0%   (0/1)0%   (0/12)0%   (0/4)
getHandle (): AbstractEventBus$Handle 0%   (0/1)0%   (0/3)0%   (0/1)
isNew (): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
     
class AbstractEventBus$EventBusStatsImpl100% (1/1)33%  (4/12)41%  (49/119)38%  (10/26)
getConclusions (): long 0%   (0/1)0%   (0/4)0%   (0/1)
getConclusionsDelta (): long 0%   (0/1)0%   (0/11)0%   (0/2)
getHandlersFired (): long 0%   (0/1)0%   (0/4)0%   (0/1)
getHandlersFiredDelta (): long 0%   (0/1)0%   (0/11)0%   (0/2)
getPosts (): long 0%   (0/1)0%   (0/4)0%   (0/1)
getPostsDelta (): long 0%   (0/1)0%   (0/11)0%   (0/2)
getStart (): Date 0%   (0/1)0%   (0/3)0%   (0/1)
reset (): void 0%   (0/1)0%   (0/22)0%   (0/6)
AbstractEventBus$EventBusStatsImpl (AbstractEventBus): void 100% (1/1)100% (41/41)100% (8/8)
AbstractEventBus$EventBusStatsImpl (AbstractEventBus, AbstractEventBus$EventB... 100% (1/1)100% (4/4)100% (1/1)
children (): Iterable 100% (1/1)100% (2/2)100% (1/1)
getName (): String 100% (1/1)100% (2/2)100% (1/1)
     
class AbstractEventBus100% (1/1)85%  (17/20)77%  (649/845)79%  (146.5/186)
join (long): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
manageHandlers (Matcher$HandlerManager): void 0%   (0/1)0%   (0/5)0%   (0/2)
setExceptionHandler (ExceptionHandler): void 0%   (0/1)0%   (0/4)0%   (0/2)
takeSnapshot (AbstractEventBus$Snapshot): void 100% (1/1)53%  (18/34)67%  (6/9)
post (Object, Predicate []): AbstractEventBus$Handle 100% (1/1)55%  (88/160)58%  (18.7/32)
processInferenceCommand (InferenceCommand): AbstractEventBus$Handle 100% (1/1)81%  (263/325)85%  (58/68)
createMasterHandle (PostCommand): AbstractEventBus$Handle 100% (1/1)82%  (103/125)80%  (18.4/23)
workingMemorySnapshot (AbstractEventBus$StateSnapshot): void 100% (1/1)88%  (53/60)71%  (6.4/9)
getDerivations (Object): Collection 100% (1/1)92%  (37/40)89%  (8.9/10)
<static initializer> 100% (1/1)100% (5/5)100% (2/2)
AbstractEventBus (Class, EventStore, InferencePolicy, InferenceFilter, StatsC... 100% (1/1)100% (41/41)100% (13/13)
addHandler (EventHandler): Object 100% (1/1)100% (5/5)100% (1/1)
getEventType (): Class 100% (1/1)100% (3/3)100% (1/1)
getExceptionHandler (): ExceptionHandler 100% (1/1)100% (3/3)100% (1/1)
getInferencePolicy (): InferencePolicy 100% (1/1)100% (3/3)100% (1/1)
getStore (): EventStore 100% (1/1)100% (3/3)100% (1/1)
join (): void 100% (1/1)100% (7/7)100% (3/3)
removeHandlers (Iterable): void 100% (1/1)100% (5/5)100% (2/2)
removeHandlers (Object []): void 100% (1/1)100% (5/5)100% (2/2)
reset (): void 100% (1/1)100% (7/7)100% (3/3)

1package com.hammurapi.eventbus;
2 
3import java.lang.ref.Reference;
4import java.lang.ref.WeakReference;
5import java.util.Arrays;
6import java.util.Collection;
7import java.util.Collections;
8import java.util.Date;
9import java.util.HashSet;
10import java.util.List;
11import java.util.Map;
12import java.util.Queue;
13import java.util.Set;
14import java.util.concurrent.ExecutorService;
15import java.util.concurrent.TimeUnit;
16import java.util.concurrent.atomic.AtomicLong;
17import java.util.concurrent.locks.Lock;
18import java.util.logging.Level;
19import java.util.logging.Logger;
20 
21import com.hammurapi.common.ExceptionHandler;
22import com.hammurapi.common.Util;
23import com.hammurapi.common.concurrent.TrackingExecutorService;
24import com.hammurapi.eventbus.Matcher.HandlerManager;
25import com.hammurapi.eventbus.monitoring.EventBusStats;
26import com.hammurapi.eventbus.monitoring.Stats;
27import com.hammurapi.eventbus.monitoring.StatsCollector;
28import com.hammurapi.extract.ComparisonResult;
29import com.hammurapi.extract.Extractor;
30import com.hammurapi.extract.Predicate;
31import com.hammurapi.store.Store;
32 
33/**
34 * Event bus dispatches events to event handlers.
35 * @author Pavel Vlasov.
36 * @param <E> Event type.
37 * @param <P> Priority type.
38 * @param <C> Context type.
39 * @param <K> Handler registration key.
40 */
41public abstract class AbstractEventBus<E, P extends Comparable<P>, C, K, H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E,P,C,H,S>> implements EventBus<E, P, C, K, H, S> {
42        protected static final Logger logger = Logger.getLogger(AbstractEventBus.class.getName());
43                
44        /**
45         * Interface to output bus structure for troubleshooting.
46         * @author Pavel Vlasov
47         * @param <K> Registration key type.
48         */
49        public interface Snapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> {
50                
51                /**
52                 * Invoked before any other methods.
53                 */
54                void start();
55                
56                /**
57                 * Invoked at the end of taking snapshot
58                 * @param success True if there've been no exceptions. 
59                 */
60                void end(boolean success);
61                
62                void handler(K id, EventHandler<E, P, C, H, S> eventHandler);
63                
64                void predicateNode(K id, Predicate<E, C> predicate, Collection<K> trueChildren, Collection<K> trueHandlers, Collection<K> falseChildren, Collection<K> falseHandlers, boolean isRoot);
65                
66                void joinInput(K id, K joinNodeId, int index);
67                
68                /**
69                 * 
70                 * @param id Join node ID.
71                 * @param predicates Node predicates.
72                 * @param outputIndices Node output indices.
73                 * @param eventHandlerId Handler ID if this node is a final join node.
74                 * @param nextJoinNodeId Next join node id for intermediary nodes.
75                 */
76                void joinNode(
77                                K id, 
78                                Predicate<E, C> predicate, 
79                                Set<Integer> outputIndices, 
80                                K eventHandlerId, 
81                                K nextJoinNodeId);
82                
83        }
84        
85        /**
86         * This snapshot also outputs bus state.
87         * @author Pavel Vlasov
88         *
89         * @param <E>
90         * @param <P>
91         * @param <C>
92         * @param <K>
93         * @param <H>
94         * @param <S>
95         */
96        public interface StateSnapshot<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> extends Snapshot<E, P, C, K, H, S> {
97                
98                void event(K id, E event, boolean directPost);
99                
100                void derivation(K eventId, K eventHandlerId, List<K> inputs);
101                
102                void joinInputCollector(K joinNodeId, int[] indices, Collection<K[]> elements);
103                
104        }
105        
106 
107        private Class<E> eventType;
108 
109        private InferencePolicy inferencePolicy;
110 
111        private TimeUnit statsTimeUnit;
112 
113        private StatsCollector statsCollector;
114        
115        private class EventBusStatsImpl implements EventBusStats {
116                
117                AtomicLong internalPosts = new AtomicLong();
118                
119                AtomicLong posts = new AtomicLong();
120                
121                AtomicLong postsBaseline = new AtomicLong();                
122                AtomicLong conclusionsBaseline = new AtomicLong();        
123                
124                private Date start = new Date();
125 
126                @Override
127                public String getName() {
128                        return "Event Bus";
129                }
130 
131                @Override
132                public void reset() {
133                        internalPosts.set(0);
134                        posts.set(0);
135                        postsBaseline.set(0);
136                        conclusionsBaseline.set(0);
137                        start = new Date();
138                }
139 
140                @Override
141                public Iterable<Stats> children() {
142                        return Collections.emptyList();
143                }
144 
145                @Override
146                public long getPosts() {
147                        return posts.get();
148                }
149 
150                @Override
151                public long getPostsDelta() {
152                        long currentPosts = posts.get();
153                        return currentPosts - postsBaseline.getAndSet(currentPosts);
154                }
155 
156                @Override
157                public long getConclusions() {
158                        return internalPosts.get();
159                }
160 
161                @Override
162                public long getConclusionsDelta() {
163                        long currentConclusions = internalPosts.get();
164                        return currentConclusions - conclusionsBaseline.getAndSet(currentConclusions);
165                }
166                
167                AtomicLong handlersFired = new AtomicLong();
168                AtomicLong handlersFiredBaseline = new AtomicLong();
169                
170                @Override
171                public long getHandlersFired() {
172                        return handlersFired.get();
173                }
174                
175                @Override
176                public long getHandlersFiredDelta() {
177                        long currentHandlersFired = handlersFired.get();
178                        return currentHandlersFired - handlersFiredBaseline.getAndSet(currentHandlersFired);
179                }
180 
181                @Override
182                public Date getStart() {
183                        return start;
184                }
185                
186        }
187        
188        private EventBusStatsImpl stats;
189 
190        private InferenceFilter<E, P, C, K, H, S> inferenceFilter;
191 
192        private Matcher<E, P, C, K, H, S> matcher;
193        
194        public AbstractEventBus(
195                        Class<E> eventType, 
196                        S store, 
197                        InferencePolicy inferencePolicy,
198                        InferenceFilter<E,P,C,K,H,S> inferenceFilter,
199                        StatsCollector statsCollector,
200                        TimeUnit statsTimeUnit,
201                        Matcher<E,P,C,K,H,S> matcher) {
202                this.store = store;
203                this.eventType = eventType;
204                this.inferencePolicy = inferencePolicy;
205                this.statsCollector = statsCollector;
206                if (statsCollector!=null) {
207                        stats = new EventBusStatsImpl();
208                        statsCollector.add(stats);
209                }
210                this.statsTimeUnit = statsTimeUnit;
211                this.inferenceFilter = inferenceFilter;
212                this.matcher = matcher;
213                this.matcher.setEventBus(this);
214        }
215        
216        @Override
217        public Class<E> getEventType() {
218                return eventType;
219        }
220        
221        public InferencePolicy getInferencePolicy() {
222                return inferencePolicy;
223        }
224        
225        /** Abstract methods **/
226                
227 
228        /**
229         * Instantiates master handle.
230         * @param event
231         * @return
232         */
233        protected abstract H newMasterHandle(PostCommand<E,P,C,K,H,S> postCommand);
234        
235        protected class CreateMasterHandleResult {
236                
237                private boolean isNew;
238                private H handle;
239                
240                public boolean isNew() {
241                        return isNew;
242                }
243                public H getHandle() {
244                        return handle;
245                }
246                
247                CreateMasterHandleResult(H handle, boolean isNew) {
248                        super();
249                        this.handle = handle;
250                        this.isNew = isNew;
251                }
252                
253                
254        }
255        
256        
257        /**
258         * Returns "root" executor service to submit predicate evaluation and handler execution tasks to.
259         */
260        protected abstract TrackingExecutorService getExecutorService();
261        
262        /**
263         * Creates a wrapper around the master executor service for task tracking purposes.
264         * @param master Master executor service
265         * @return
266         */
267        protected abstract TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name);
268                
269        /**
270         * Create a wrapper around the root executor service for task tracking purposes.
271         * @return executor service or null for the synchronous mode.
272         */
273        protected abstract TrackingExecutorService createExecutorService(boolean oneOff, String name);        
274                
275        /**
276         * Generates handler ID.
277         * @return
278         */
279        protected abstract K nextId();
280 
281        /**
282         * Helper interface for snapshot taking.
283         * @author Pavel Vlasov
284         *
285         * @param <K>
286         */
287        public interface DerivationEx<K> {
288                K getHandlerId();
289                List<K> getInputIds();
290        }
291                
292        @SuppressWarnings("unchecked")
293        protected void workingMemorySnapshot(StateSnapshot<E, P, C, K, H, S> snapshot) {
294                getStore().readLock().lock();
295                try {
296                        for (H handle: getStore().getAll()) {
297                                snapshot.event(handle.getId(), handle.getEvent(), handle.isDirectPost());
298                                for (Derivation<E,P,C> d: handle.getDerivations()) {
299                                        snapshot.derivation(handle.getId(), ((DerivationEx<K>) d).getHandlerId(), ((DerivationEx<K>) d).getInputIds());
300                                }
301                        }                        
302                } finally {
303                        getStore().readLock().unlock();
304                }
305        }
306 
307        // --- End abstract methods ---
308        
309        
310        public void takeSnapshot(Snapshot<E, P, C, K, H, S> snapshot) {
311                try {
312                        snapshot.start();
313                        
314                        // Step 1 Working memory
315                        if (snapshot instanceof StateSnapshot) {
316                                workingMemorySnapshot((StateSnapshot<E, P, C, K, H, S>) snapshot);
317                        }
318                        
319                        matcher.takeSnapshot(snapshot);
320                        
321                        snapshot.end(true);
322                } catch (Exception e) {
323                        snapshot.end(false);
324                        throw new DispatchNetworkException("Error taking snapshot: "+e, e);
325                }
326        }
327                
328        private ExceptionHandler exceptionHandler;
329        
330        /* (non-Javadoc)
331         * @see com.hammurapi.eventbus.EventBus#setExceptionHandler(com.hammurapi.eventbus.ExceptionHandler)
332         */
333        public void setExceptionHandler(ExceptionHandler exceptionHandler) {
334                this.exceptionHandler = exceptionHandler;
335        }
336        
337        /* (non-Javadoc)
338         * @see com.hammurapi.eventbus.EventBus#getExceptionHandler()
339         */
340        public ExceptionHandler getExceptionHandler() {
341                return exceptionHandler;
342        }
343        
344        // Constructor which takes executor service and collection factory for joiner.
345        // Borrow joiner from 
346        
347        /* (non-Javadoc)
348         * @see com.hammurapi.eventbus.EventBus#add(com.hammurapi.eventbus.EventHandler, C, boolean, com.hammurapi.extract.Predicate)
349         */
350        @Override
351        public K addHandler(final EventHandler<E, P, C, H, S> eventHandler) {
352                return matcher.addHandler(eventHandler);
353        }
354        
355//        private JoinNode createJoinNode(PredicateNode pn,
356//                        EventHandlerWrapper<E, P, C, K, H> handler) {
357//                // TODO Auto-generated method stub
358//                return null;
359//        }
360                
361        /* (non-Javadoc)
362         * @see com.hammurapi.eventbus.EventBus#remove(K)
363         */
364        public void removeHandlers(Iterable<K> keys) {
365                matcher.removeHandlers(keys);
366        }
367        
368        @Override
369        public void removeHandlers(K... keys) {
370                removeHandlers(Arrays.asList(keys));                
371        }
372        
373        protected abstract Lock getRtcLock();
374        
375        /* (non-Javadoc)
376         * @see com.hammurapi.eventbus.EventBus#post(E)
377         */
378        @Override
379        public H post(final E event, Predicate<E, S>... validators) {
380                final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
381                if (fineIsLoggable) {
382                        StringBuilder sb = new StringBuilder();
383                        sb.append("Client post, event: "+event);
384                        if (validators.length>0) {
385                                sb.append(", validators: "+Arrays.toString(validators));
386                        }
387                        logger.fine(sb.toString());
388                }
389                
390                if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) {
391                        getRtcLock().lock();
392                }
393                try {
394                        boolean exclusiveOrAfterRoot = InferencePolicy.AFTER_ROOT_EVENT.compareTo(inferencePolicy)<=0;
395                        PostCommand<E,P,C,K,H,S> postCommand = new PostCommand<E,P,C,K,H,S>(event, true, null, null, null, createInferenceContext(), validators);
396                        H ret = processInferenceCommand(postCommand);
397                        if (exclusiveOrAfterRoot) {
398                                TrackingExecutorService hes = postCommand.getInferenceContext().getExecutorService();
399                                if (hes!=null ) {
400                                        hes.join();
401                                }
402                                Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = postCommand.getInferenceContext().getInferenceCommandsQueue(); 
403                                while (!inferenceCommandsQueue.isEmpty()) {
404                                        for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
405                                                processInferenceCommand(conclusion);
406                                        }
407                                        if (hes!=null) {
408                                                hes.join();
409                                        }
410                                }
411                        }        
412                        return ret;
413                } catch (Exception e) {
414                        logger.log(Level.SEVERE, "Post error: "+e, e);
415                        if (getExceptionHandler()!=null) {
416                                getExceptionHandler().handleException(e);
417                        }
418                        return null;
419//                        masterHandle.handleException(e);                        
420                } finally {
421                        if (InferencePolicy.EXCLUSIVE.equals(inferencePolicy)) {
422                                getRtcLock().unlock();
423                        }                                                                
424                }
425        }        
426        
427        /**
428         * @param command command to be processed
429         * @param executorService Executor service for asynchronous processing
430         * @param inferenceCommandsQueue collector of inference commands for further processing. 
431         */
432        @SuppressWarnings("unchecked")
433        protected H processInferenceCommand(InferenceCommand<E, P, C, K, H, S> command) {
434                // Filter commands.
435                if (inferenceFilter!=null && !inferenceFilter.accept(command, this)) {
436                        return null;
437                }
438                if (command instanceof PostCommand) {
439                        PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) command;
440                        if (stats!=null) {
441                                if (postCommand.isDirectPost()) {
442                                        stats.posts.incrementAndGet();
443                                } else {
444                                        stats.internalPosts.incrementAndGet();
445                                }
446                        }
447                        
448                        try {
449                                Handle<E,P,C,K> handle = postCommand.isHandleMode() ? postCommand.getHandle() : createMasterHandle(postCommand);
450                                if (postCommand.isDirectPost()) {
451                                        postCommand.getInferenceContext().setRootHandle(handle);
452                                }
453                                Iterable<EventHandlerWrapper<E, P, C, K, H, S>> handlers = matcher.match(handle.getEvent(), postCommand.getInferenceContext().getExecutorService());
454                                
455                                Set<P> consumingPriorities = new HashSet<P>();
456                                for (EventHandlerWrapper<E, P, C, K, H, S> ehw: handlers) {
457                                        if (ehw.consumes()) {
458                                                consumingPriorities.add(ehw.getPriority());
459                                        }
460                                }
461                                P prevPriority = null;
462                                final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
463                                InferenceContext<E, P, C, K, H, S> inferenceContext = postCommand.getInferenceContext().createNext();
464                                TrackingExecutorService es = inferenceContext.getExecutorService();
465                                for (EventHandlerWrapper<E, P, C, K, H, S> handler: handlers) {                                
466                                        // Wait for higher priority handlers to finish.
467                                        if (es!=null && prevPriority!=null && consumingPriorities.contains(prevPriority) && !handler.getPriority().equals(prevPriority)) {
468                                                es.join();
469                                                if (!handle.isValid()) {
470                                                        if (fineIsLoggable) {
471                                                                logger.fine("Event was consumed: "+handle.getEvent());
472                                                        }
473                                                                                                                
474                                                        break; // Event has been consumed.
475                                                }
476                                        }
477                                                
478                                        if (fineIsLoggable) {
479                                                logger.fine("Posting handler to execution: "+handler+" to process event "+handle.getEvent());
480                                        }
481                                        
482                                        if (stats!=null) {
483                                                stats.handlersFired.incrementAndGet();
484                                        }
485                                        if (es==null) {                                                
486                                                handler.post(null, inferenceContext, handle);                                                
487                                        } else {
488                                                es.execute(new HandlerTask<E,P,C,K,H,S>(handle, handler, inferenceContext));
489                                        }
490                                        prevPriority = handler.getPriority();
491                                }
492                                
493                                if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) {
494                                        if (es!=null) {
495                                                es.join();
496                                        }
497                                        Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue();
498                                        for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
499                                                processInferenceCommand(conclusion); 
500                                        }
501                                }
502                                return (H) handle;
503                        } catch (InterruptedException e) {
504                                throw new EventDispatchException(e);
505                        }
506                } 
507                
508                if (command instanceof RemoveCommand) {
509                        processRemoveCommand((RemoveCommand<E,P,C,K,H,S>) command);
510                        return null;
511                }
512                
513                if (command instanceof RetractCommand) {
514                        final boolean fineIsLoggable = logger.isLoggable(Level.FINE);
515                                        
516                        if (fineIsLoggable) {
517                                logger.fine("Posting retract command to execution: "+command);
518                        }
519                        
520                        if (stats!=null) {
521                                stats.handlersFired.incrementAndGet();
522                        }
523                        
524                        InferenceContext<E, P, C, K, H, S> inferenceContext = command.getInferenceContext().createNext();
525                        TrackingExecutorService es = inferenceContext.getExecutorService();
526                        
527                        if (es==null) {
528                                RetractTask.processCommand((RetractCommand<E,P,C,K,H,S>) command, inferenceContext);                                                
529                        } else {
530                                es.execute(new RetractTask<E,P,C,K,H,S>((RetractCommand<E,P,C,K,H,S>) command, inferenceContext));
531                        }
532                        
533                        if (InferencePolicy.AFTER_EVENT.equals(inferencePolicy)) {
534                                if (es!=null) {
535                                        try {
536                                                es.join();
537                                        } catch (InterruptedException e) {
538                                                throw new EventDispatchException(e);
539                                        }
540                                }
541                                Queue<InferenceCommand<E, P, C, K, H, S>> inferenceCommandsQueue = inferenceContext.getInferenceCommandsQueue();
542                                for (InferenceCommand<E,P,C,K,H,S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) {
543                                        processInferenceCommand(conclusion); 
544                                }
545                        }                        
546                        
547                        return null;
548                }
549                
550                throw new IllegalArgumentException("Unexpected command: "+command);
551        }
552        
553        // A trick to avoid exposure of local handle methods.
554        protected abstract void processRemoveCommand(RemoveCommand<E,P,C,K,H,S> command);
555        
556        /**
557         * Creates and returns new master handle. 
558         * re-dispatch existing event. 
559         * @param event Event to add to the store.
560         * @param joinDelegate
561         * @param directPost
562         * @param validators
563         * @param derivation 
564         * @return Master handle for new event, null for existing.
565         */
566        protected H createMasterHandle(PostCommand<E,P,C,K,H,S> postCommand) {
567                getStore().writeLock().lock();
568                try {
569                        H handle = null;
570                        if (getStore().getPrimaryKeyExtractor()==null) {
571                                for (H eh: getStore()) {
572                                        if (eh.isValid() && postCommand.getEvent().equals(eh.getEvent())) {
573                                                handle = eh;
574                                                break;
575                                        }
576                                }
577                        } else {
578                                handle = getStore().getByPrimaryKey(postCommand.getEvent());
579                        }
580                        
581                        if (handle!=null && !postCommand.isDirectPost()) {                                
582                                handle.addDerivation(postCommand.getHandlerId(), postCommand.getHandler(), postCommand.getInputs());
583                                return handle;
584                        }
585                        
586                        handle = newMasterHandle(postCommand);
587                        Predicate<H, S>[] va = new Predicate[postCommand.getValidators().length];
588                        final Reference<E> eventReference = new WeakReference<E>(postCommand.getEvent()); 
589                        for (int i=0; i<postCommand.getValidators().length; ++i) {
590                                final Predicate<E, S> validator = postCommand.getValidators()[i];
591                                
592                                va[i] = new Predicate<H, S>() {
593 
594                                        @Override
595                                        public Boolean extract(
596                                                        S context,
597                                                        Map<S, Map<Extractor<H, ? super Boolean, S>, ? super Boolean>> cache,
598                                                        H... obj) {
599                                                E event = eventReference.get();
600                                                if (event==null) {
601                                                        return false;
602                                                }
603                                                return validator.extract(context, null, Util.wrap(event));
604                                        }
605 
606                                        @Override
607                                        public Set<Integer> parameterIndices() {
608                                                return validator.parameterIndices();
609                                        }
610 
611                                        @Override
612                                        public boolean isContextDependent() {
613                                                return validator.isContextDependent();
614                                        }
615 
616                                        @Override
617                                        public ComparisonResult compareTo(Extractor<H, Boolean, S> other) {
618                                                return ComparisonResult.NOT_EQUAL_NM; // Doesn't matter.
619                                        }
620 
621                                        @Override
622                                        public double getCost() {
623                                                return validator.getCost();
624                                        }
625                                        
626                                };
627                        }
628                        Store.Handle<H, E, S> sh = getStore().put(handle, va);
629                        handle.setStoreHandle(sh);
630                        return handle;
631                } finally {
632                        getStore().writeLock().unlock();
633                }
634        }        
635        
636        /* (non-Javadoc)
637         * @see com.hammurapi.eventbus.EventBus#reset()
638         */
639        public void reset() {
640                getStore().clear();
641                matcher.reset();
642        }
643        
644//        public void rebuildDispatchNetwork() {
645//                getBusLock().writeLock().lock();
646//                try {
647//                        rootNode.rebuild();
648//                } finally {
649//                        getBusLock().writeLock().unlock();
650//                }                
651//        }
652        
653        /**
654         * Store entry complements store handle to provide functionality of event handle. 
655         * @author Pavel Vlasov
656         *
657         */
658        public interface StoreEntry<E,P extends Comparable<P>,C,K> {
659                
660                /**
661                 * @return Derivations of this event.
662                 */
663                Collection<Derivation<E,P,C>> getDerivations();
664                                
665                boolean isDerivedFrom(E event);
666                
667                /**
668                 * Sets new event. Invoked by substitute(). 
669                 * For internal use, shall not be invoked by client code.
670                 * @param event
671                 */
672                void setEvent(E event);
673                                                                        
674                /**
675                 * Invalidates this handle, creates a new handle
676                 * for updated event, posts updated event to the bus,
677                 * For internal use, shall not be invoked by client code.
678                 * @return New handle for updated event.
679                 */
680                Handle<E,P,C,K> update();
681                
682                /**
683                 * @return Event id.
684                 */
685                K getId();
686        }
687        
688        /**
689         * Event handle. 
690         * @author Pavel Vlasov
691         *
692         */
693        public interface Handle<E,P extends Comparable<P>,C,K> extends EventBus.Handle<E, P, C>, ExceptionHandler {
694                
695//                /**
696//                 * Sets new event. Invoked by substitute(). 
697//                 * For internal use, shall not be invoked by client code.
698//                 * @param event
699//                 */
700//                void setEvent(E event);
701                                                        
702                /**
703                 * For internal use, shall not be invoked by client code.
704                 * @return New handle for updated event.
705                 */
706                void update();
707                
708                K getId();
709                
710                /**
711                 * Callback method. For internal use.
712                 * @param storeHandle
713                 */
714                <H extends AbstractEventBus.Handle<E,P,C,K>, S extends EventStore<E, P, C, H, S>> void setStoreHandle(Store.Handle<H, E, S> storeHandle);
715                
716                /**
717                 * For internal use.
718                 * @param derivation
719                 */
720                void addDerivation(K handlerId, EventHandler<E, P, C, ?, ?> handler, Handle<E,P,C,K>[] inputs);
721                
722                /**
723                 * @return true if this handle's event was posted by client code.
724                 */
725                boolean isDirectPost();
726                
727                /**
728                 * Sets direct post to true. For internal use.
729                 */
730                void setDirectPost();
731        }
732        
733        /* (non-Javadoc)
734         * @see com.hammurapi.eventbus.EventBus#getDerivations(E)
735         */
736        public Collection<Derivation<E,P,C>> getDerivations(E event) {
737                H handle = null;
738                if (getStore().getPrimaryKeyExtractor()==null) {
739                        for (H sh: getStore()) {
740                                if (event.equals(sh.getEvent())) {
741                                        handle = sh;
742                                        break;
743                                }
744                        }
745                } else {
746                        handle = getStore().getByPrimaryKey(event);
747                }
748                
749                if (handle==null) {
750                        return Collections.emptyList();
751                }
752                return handle.getDerivations();
753        }
754 
755        @Override
756        public void join() throws InterruptedException {
757                if (getExecutorService()!=null) {
758                        getExecutorService().join();
759                }
760        }
761        
762        public boolean join(long timeout) throws InterruptedException {
763                return getExecutorService().join(timeout);
764        }
765        
766        private S store;
767         
768        @Override
769        public S getStore() {
770                return store;
771        }
772 
773        @Override
774        public void manageHandlers(HandlerManager<E, P, C, K, H, S> handlerManager) {
775                matcher.manageHandlers(handlerManager);
776        }
777 
778        /**
779         * Creates inference context with zero chain length and no root handle.
780         * Inference queue is supplied with AFTER_EVENT, AFTER_ROOT_EVENT and EXCLUSIVE policies.
781         * Use this method to create initial/root inference context.
782         * @return
783         */
784        protected abstract InferenceContext<E,P,C,K,H,S> createInferenceContext();
785        
786}

[all classes][com.hammurapi.eventbus]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov