EMMA Coverage Report (generated Thu Jan 20 11:39:44 EST 2011)
[all classes][com.hammurapi.eventbus.tests]

COVERAGE SUMMARY FOR SOURCE FILE [LocalEventBusTests.java]

nameclass, %method, %block, %line, %
LocalEventBusTests.java100% (10/10)99%  (76/77)98%  (5611/5747)98%  (1027.7/1052)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class LocalEventBusTests$4100% (1/1)100% (2/2)93%  (13/14)96%  (2.9/3)
convert (Object): Observable 100% (1/1)88%  (7/8)87%  (0.9/1)
LocalEventBusTests$4 (LocalEventBusTests): void 100% (1/1)100% (6/6)100% (2/2)
     
class LocalEventBusTests$StringBusFactory100% (1/1)80%  (4/5)97%  (98/101)96%  (26/27)
isUseExecutor (): boolean 0%   (0/1)0%   (0/3)0%   (0/1)
LocalEventBusTests$StringBusFactory (LocalEventBusTests, InferencePolicy, Inf... 100% (1/1)100% (24/24)100% (8/8)
access$0 (LocalEventBusTests$StringBusFactory): int 100% (1/1)100% (3/3)100% (1/1)
createBus (): LocalEventBus 100% (1/1)100% (68/68)100% (16/16)
isUseSimpleMatcher (): boolean 100% (1/1)100% (3/3)100% (1/1)
     
class LocalEventBusTests100% (1/1)100% (45/45)98%  (5208/5340)98%  (941.8/965)
testFaulty (): void 100% (1/1)22%  (9/40)3%   (0.2/8)
testRemoveWithHandle (): void 100% (1/1)79%  (104/132)86%  (21.5/25)
testRemoveWithBus (): void 100% (1/1)79%  (105/133)86%  (21.6/25)
testRemoveHandlers (): void 100% (1/1)89%  (202/228)93%  (42/45)
testConsumeJoin (): void 100% (1/1)94%  (208/222)91%  (46.6/51)
testReset (): void 100% (1/1)95%  (99/104)95%  (20.8/22)
LocalEventBusTests (): void 100% (1/1)100% (3/3)100% (1/1)
_testCompiledFamilyTies (InferencePolicy, boolean, boolean, int, boolean): void 100% (1/1)100% (465/465)100% (66/66)
_testFamilyTies (InferencePolicy, boolean, boolean, int, boolean): void 100% (1/1)100% (463/463)100% (67/67)
access$0 (LocalEventBusTests): ExecutorService 100% (1/1)100% (3/3)100% (1/1)
createObjectBus (InferenceFilter): Iterable 100% (1/1)100% (136/136)100% (12/12)
createStringBus (InferenceFilter): Iterable 100% (1/1)100% (136/136)100% (12/12)
setUp (): void 100% (1/1)100% (5/5)100% (2/2)
takeSnapshot (LocalEventBus, String): void 100% (1/1)100% (10/10)100% (2/2)
tearDown (): void 100% (1/1)100% (4/4)100% (2/2)
testBinderCompiler (): void 100% (1/1)100% (15/15)100% (3/3)
testCompiledFamilyTies (): void 100% (1/1)100% (87/87)100% (13/13)
testCompiledJoin (): void 100% (1/1)100% (155/155)100% (29/29)
testFamilyTies (): void 100% (1/1)100% (87/87)100% (13/13)
testFamilyTiesBindingCompilation (): void 100% (1/1)100% (57/57)100% (10/10)
testFamilyTiesJmx (): void 100% (1/1)100% (421/421)100% (71/71)
testFireJoinRemoveHandler (): void 100% (1/1)100% (116/116)100% (24/24)
testHandleJoin (): void 100% (1/1)100% (113/113)100% (22/22)
testInifinite (): void 100% (1/1)100% (69/69)100% (12/12)
testJoin (): void 100% (1/1)100% (157/157)100% (29/29)
testJoinWithCost (): void 100% (1/1)100% (99/99)100% (19/19)
testLoop (): void 100% (1/1)100% (85/85)100% (16/16)
testMoreRestrictive (): void 100% (1/1)100% (68/68)100% (12/12)
testOneOff (): void 100% (1/1)100% (64/64)100% (11/11)
testOppositeChaining (): void 100% (1/1)100% (67/67)100% (12/12)
testParallelExecution (): void 100% (1/1)100% (112/112)100% (22/22)
testParameterNamesJoin (): void 100% (1/1)100% (101/101)100% (19/19)
testParameterizedStringHandlerWithMethodCondition (): void 100% (1/1)100% (83/83)100% (16/16)
testPost (): void 100% (1/1)100% (112/112)100% (20/20)
testPredicateChaining (): void 100% (1/1)100% (67/67)100% (12/12)
testPriority (): void 100% (1/1)100% (88/88)100% (14/14)
testRemoveHandler (): void 100% (1/1)100% (72/72)100% (16/16)
testSimpleStringHandler (): void 100% (1/1)100% (65/65)100% (11/11)
testSimpleStringHandler2 (): void 100% (1/1)100% (60/60)100% (11/11)
testStringHandlerWithMethodCondition (): void 100% (1/1)100% (69/69)100% (13/13)
testStringHandlerWithParameterCondition (): void 100% (1/1)100% (69/69)100% (13/13)
testTokenParameterizedStringHandlerWithMethodCondition (): void 100% (1/1)100% (99/99)100% (19/19)
testUpdateFireRemoveHandlers (): void 100% (1/1)100% (203/203)100% (49/49)
testUpdateFromHandler (): void 100% (1/1)100% (195/195)100% (46/46)
testUpdateThroughObservable (): void 100% (1/1)100% (201/201)100% (47/47)
     
class LocalEventBusTests$1100% (1/1)100% (2/2)100% (25/25)100% (4/4)
LocalEventBusTests$1 (LocalEventBusTests, double, TimeUnit, boolean, int [], ... 100% (1/1)100% (13/13)100% (2/2)
extractInternal (Object, Map, Object []): Boolean 100% (1/1)100% (12/12)100% (2/2)
     
class LocalEventBusTests$2100% (1/1)100% (2/2)100% (25/25)100% (5/5)
LocalEventBusTests$2 (LocalEventBusTests, Thread [], Object []): void 100% (1/1)100% (12/12)100% (2/2)
post (EventDispatchContext, Object []): void 100% (1/1)100% (13/13)100% (3/3)
     
class LocalEventBusTests$3100% (1/1)100% (2/2)100% (21/21)100% (4/4)
LocalEventBusTests$3 (LocalEventBusTests, int, Integer, Object, boolean, bool... 100% (1/1)100% (16/16)100% (2/2)
post (EventDispatchContext, Object []): void 100% (1/1)100% (5/5)100% (2/2)
     
class LocalEventBusTests$ObjectBusFactory100% (1/1)100% (11/11)100% (157/157)100% (37/37)
LocalEventBusTests$ObjectBusFactory (LocalEventBusTests, InferencePolicy, Inf... 100% (1/1)100% (24/24)100% (8/8)
access$0 (LocalEventBusTests$ObjectBusFactory): InferencePolicy 100% (1/1)100% (3/3)100% (1/1)
access$1 (LocalEventBusTests$ObjectBusFactory): int 100% (1/1)100% (3/3)100% (1/1)
access$2 (LocalEventBusTests$ObjectBusFactory): boolean 100% (1/1)100% (3/3)100% (1/1)
access$3 (LocalEventBusTests$ObjectBusFactory): boolean 100% (1/1)100% (3/3)100% (1/1)
createBus (): LocalEventBus 100% (1/1)100% (72/72)100% (17/17)
getInferencePolicy (): InferencePolicy 100% (1/1)100% (3/3)100% (1/1)
isUseExecutor (): boolean 100% (1/1)100% (3/3)100% (1/1)
isUseSimpleMatcher (): boolean 100% (1/1)100% (3/3)100% (1/1)
setObservableConverter (ObservableConverter): void 100% (1/1)100% (4/4)100% (2/2)
toString (): String 100% (1/1)100% (36/36)100% (3/3)
     
class LocalEventBusTests$ObjectBusFactory$1100% (1/1)100% (1/1)100% (7/7)100% (2/2)
LocalEventBusTests$ObjectBusFactory$1 (LocalEventBusTests$ObjectBusFactory, L... 100% (1/1)100% (7/7)100% (2/2)
     
class LocalEventBusTests$ObservableStringReference100% (1/1)100% (6/6)100% (50/50)100% (14/14)
LocalEventBusTests$ObservableStringReference (String): void 100% (1/1)100% (11/11)100% (4/4)
addObserver (Observer): void 100% (1/1)100% (6/6)100% (2/2)
deleteObserver (Observer): void 100% (1/1)100% (6/6)100% (2/2)
get (): String 100% (1/1)100% (3/3)100% (1/1)
getObservers (): Collection 100% (1/1)100% (3/3)100% (1/1)
set (String): void 100% (1/1)100% (21/21)100% (4/4)
     
class LocalEventBusTests$StringBusFactory$1100% (1/1)100% (1/1)100% (7/7)100% (2/2)
LocalEventBusTests$StringBusFactory$1 (LocalEventBusTests$StringBusFactory, L... 100% (1/1)100% (7/7)100% (2/2)

1/**
2 * 
3 */
4package com.hammurapi.eventbus.tests;
5 
6import static junit.framework.Assert.assertEquals;
7import static junit.framework.Assert.assertFalse;
8import static junit.framework.Assert.assertNotNull;
9import static junit.framework.Assert.assertNotSame;
10import static junit.framework.Assert.assertSame;
11import static junit.framework.Assert.assertTrue;
12import static junit.framework.Assert.fail;
13 
14import java.io.File;
15import java.lang.management.ManagementFactory;
16import java.util.ArrayList;
17import java.util.Collection;
18import java.util.Collections;
19import java.util.HashMap;
20import java.util.HashSet;
21import java.util.Map;
22import java.util.Set;
23import java.util.concurrent.ExecutorService;
24import java.util.concurrent.Executors;
25import java.util.concurrent.atomic.AtomicInteger;
26import java.util.concurrent.atomic.AtomicReference;
27 
28import org.junit.After;
29import org.junit.Before;
30import org.junit.Test;
31 
32import com.hammurapi.common.MapTokenSource;
33import com.hammurapi.common.Observable;
34import com.hammurapi.common.ObservableConverter;
35import com.hammurapi.common.Observer;
36import com.hammurapi.common.TokenExpander;
37import com.hammurapi.common.TokenExpander.TokenSource;
38import com.hammurapi.eventbus.AbstractEventBus.Handle;
39import com.hammurapi.eventbus.Derivation;
40import com.hammurapi.eventbus.DuplicatesFilter;
41import com.hammurapi.eventbus.EventDispatchContext;
42import com.hammurapi.eventbus.EventDispatchException;
43import com.hammurapi.eventbus.EventHandlerBase.Mode;
44import com.hammurapi.eventbus.InferenceChainLengthFilter;
45import com.hammurapi.eventbus.InferenceFilter;
46import com.hammurapi.eventbus.InferencePolicy;
47import com.hammurapi.eventbus.JavaBinderCompiler;
48import com.hammurapi.eventbus.local.LocalAbstractEventHandler;
49import com.hammurapi.eventbus.local.LocalDispatchNetworkDotSnapshot;
50import com.hammurapi.eventbus.local.LocalEventBus;
51import com.hammurapi.eventbus.local.LocalEventStore;
52import com.hammurapi.eventbus.local.LocalEventStoreImpl;
53import com.hammurapi.eventbus.local.LocalIntrospector;
54import com.hammurapi.eventbus.local.LocalSimpleMatcher;
55import com.hammurapi.eventbus.monitoring.JmxStatsCollector;
56import com.hammurapi.eventbus.snapshot.io.SnapshotOutput;
57import com.hammurapi.eventbus.tests.familyties.FamilyTiesDispatchNetworkDotSnapshot;
58import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventBus;
59import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventStore;
60import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventStoreImpl;
61import com.hammurapi.eventbus.tests.familyties.FamilyTiesIntrospector;
62import com.hammurapi.eventbus.tests.familyties.model.Child;
63import com.hammurapi.eventbus.tests.familyties.model.Person;
64import com.hammurapi.eventbus.tests.familyties.model.Relative;
65import com.hammurapi.eventbus.tests.familyties.model.Spouse;
66import com.hammurapi.eventbus.tests.familyties.rules.DaughterRule;
67import com.hammurapi.eventbus.tests.familyties.rules.DaughterRuleJavaBinder;
68import com.hammurapi.eventbus.tests.familyties.rules.FamilyTiesRules;
69import com.hammurapi.eventbus.tests.familyties.rules.GrandRules;
70import com.hammurapi.eventbus.tests.familyties.rules.GrandRulesJavaBinder;
71import com.hammurapi.eventbus.tests.familyties.rules.ParentChildRules;
72import com.hammurapi.eventbus.tests.familyties.rules.ParentChildRulesJavaBinder;
73import com.hammurapi.eventbus.tests.familyties.rules.ParentRules;
74import com.hammurapi.eventbus.tests.familyties.rules.ParentRulesJavaBinder;
75import com.hammurapi.eventbus.tests.familyties.rules.SecondaryRules;
76import com.hammurapi.eventbus.tests.familyties.rules.SecondaryRulesJavaBinder;
77import com.hammurapi.eventbus.tests.familyties.rules.SiblingRules;
78import com.hammurapi.eventbus.tests.familyties.rules.SiblingRulesJavaBinder;
79import com.hammurapi.eventbus.tests.familyties.rules.SonRule;
80import com.hammurapi.eventbus.tests.familyties.rules.SonRuleJavaBinder;
81import com.hammurapi.eventbus.tests.familyties.rules.SpouseRules;
82import com.hammurapi.eventbus.tests.familyties.rules.SpouseRulesJavaBinder;
83import com.hammurapi.eventbus.tests.fastfood.Cashier;
84import com.hammurapi.eventbus.tests.fastfood.Dish;
85import com.hammurapi.eventbus.tests.fastfood.Kitchen;
86import com.hammurapi.eventbus.tests.fastfood.Order;
87import com.hammurapi.eventbus.tests.fastfood.OrderFulfiller;
88import com.hammurapi.extract.AbstractPredicate;
89import com.hammurapi.extract.Extractor;
90import com.hammurapi.extract.InstanceOfPredicate;
91import com.hammurapi.extract.Predicate;
92 
93/**
94 * @author Pavel Vlasov
95 *
96 */
97public class LocalEventBusTests {
98        
99        private ExecutorService executorService;
100 
101        /**
102         * @throws java.lang.Exception
103         */
104        @Before
105        public void setUp() throws Exception {
106//                BlockingQueue<Runnable> wq = new ArrayBlockingQueue<Runnable>(1000);
107                 executorService = Executors.newFixedThreadPool(2);
108        }
109 
110        /**
111         * @throws java.lang.Exception
112         */
113        @After
114        public void tearDown() throws Exception {
115                 executorService.shutdown();
116        }        
117        
118        @Test
119        public void testParallelExecution() throws InterruptedException {
120                for (ObjectBusFactory obf: createObjectBus(null)) {
121                        if (!obf.isUseExecutor() || obf.inferencePolicy!=InferencePolicy.IMMEDIATELY) {
122                                continue;
123                        }
124                        
125                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
126                        
127                        final Thread[] invocationThreads = new Thread[3];
128                        invocationThreads[0] = Thread.currentThread();
129                        
130                        final Object[] handled = {null};
131                        /**
132                         * Predicate is used to record its invocation thread.
133                         */
134                        Predicate<Object,Object> predicate = new AbstractPredicate<Object, Object>(0, null, false, 0) {
135        
136                                @Override
137                                protected Boolean extractInternal(
138                                                Object context,
139                                                Map<Object, Map<Extractor<Object, ? super Boolean, Object>, ? super Boolean>> cache,
140                                                Object... obj) 
141                                {                                
142                                        invocationThreads[1] = Thread.currentThread();
143                                        return "Test event".equals(obj[0]);
144                                }
145                                
146                        };
147        
148                        /**
149                         * Handler which does nothing, just records its invocation thread.
150                         */
151                         LocalAbstractEventHandler<Object, Integer, Object> handler = new LocalAbstractEventHandler<Object, Integer, Object>() {
152        
153                                @Override
154                                public void post(
155                                                EventDispatchContext<Object, Integer, Object, Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> context,
156                                                Object... events) {
157                                        invocationThreads[2] = Thread.currentThread();
158                                        handled[0] = events[0];
159                                }
160        
161                        };
162                        
163                        handler.setPredicate(predicate);
164        
165                        bus.addHandler(handler);
166                        
167                        if (!obf.isUseSimpleMatcher()) {
168                                takeSnapshot(bus, "snapshots\\snapshot1_"+obf.id+".dot");
169                        }
170                        
171                        final String event = "Test event";
172                        bus.post(event);
173                        
174                        // Without delay tasks will be most likely executed in the main thread.
175                        Thread.sleep(100);
176                        
177                        bus.join();
178                        
179                        assertNotNull(invocationThreads[1]);
180                        assertNotNull(invocationThreads[2]);
181//                        assertNotSame(invocationThreads[0], invocationThreads[1]); // Can't guarantee that it'll be executed by another thread.
182                        assertNotSame(invocationThreads[0], invocationThreads[2]);
183                        assertSame(event, handled[0]);
184                }
185        }
186 
187        private void takeSnapshot(LocalEventBus<Object, Integer, Object> bus, String fileName) {
188                bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File(fileName)));
189        }
190        
191        @Test
192        public void testSimpleStringHandler() throws InterruptedException {
193                for (StringBusFactory sbf: createStringBus(null)) {
194                        LocalEventBus<String, Integer, Object> bus = sbf.createBus();
195                
196                        LocalIntrospector<String, Object> introspector = new LocalIntrospector<String, Object>(null, null);                
197                        SimpleStringHandler handler = new SimpleStringHandler();
198                        introspector.bind(handler, bus);
199                        
200                        if (!sbf.isUseSimpleMatcher()) {
201                                bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<String, Integer, Object>(new File("snapshots\\SimpleStringHandler_"+sbf.id+".dot")));
202                        }
203                        
204                        bus.post("Hello world!");
205                        bus.join();
206                        
207                        assertEquals(1, handler.getCounter());
208                }
209        }
210        
211        @Test
212        public void testSimpleStringHandler2() throws InterruptedException {
213                for (ObjectBusFactory obf: createObjectBus(null)) {
214                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
215                
216                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
217                        SimpleStringHandler handler = new SimpleStringHandler();
218                        introspector.bind(handler, bus);
219                        
220                        if (!obf.isUseSimpleMatcher()) {
221                                takeSnapshot(bus, "snapshots\\SimpleStringHandler2_"+obf.id+".dot");
222                        }
223                        
224                        bus.post("Hello world!");
225                        bus.join();
226                        
227                        assertEquals(1, handler.getCounter());
228                }
229        }
230        
231        @Test
232        public void testStringHandlerWithMethodCondition() throws InterruptedException {
233                for (ObjectBusFactory obf: createObjectBus(null)) {
234                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
235                
236                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
237                        StringHandlerWithMethodCondition handler = new StringHandlerWithMethodCondition();
238                        introspector.bind(handler, bus);
239                        
240                        if (!obf.isUseSimpleMatcher()) {
241                                takeSnapshot(bus, "snapshots\\StringHandlerWithMethodCondition_"+obf.id+".dot");
242                        }
243                        
244                        bus.post("Hello");
245                        bus.post("World");
246                        bus.join();
247                        
248                        assertEquals(1, handler.getCounter());
249                        assertTrue(handler.isOk());
250                }
251        }
252        
253        @Test
254        public void testStringHandlerWithParameterCondition() throws InterruptedException {
255                for (ObjectBusFactory obf: createObjectBus(null)) {
256                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
257                
258                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
259                        StringHandlerWithParameterCondition handler = new StringHandlerWithParameterCondition();
260                        introspector.bind(handler, bus);
261                        
262                        if (!obf.isUseSimpleMatcher()) {
263                                takeSnapshot(bus, "snapshots\\StringHandlerWithParameterCondition_"+obf.id+".dot");
264                        }
265                        
266                        bus.post("Hello");
267                        bus.post("World");
268                        bus.join();
269                        
270                        assertEquals(1, handler.getCounter());
271                        assertTrue(handler.isOk());
272                }
273        }
274        
275        @Test
276        public void testParameterizedStringHandlerWithMethodCondition() throws InterruptedException {
277                for (ObjectBusFactory obf: createObjectBus(null)) {
278                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
279                
280                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
281                        ParameterizedStringHandlerWithMethodCondition handler = new ParameterizedStringHandlerWithMethodCondition();
282                        handler.setFilterStr("Hello");
283                        introspector.bind(handler, bus);
284                        
285                        if (!obf.isUseSimpleMatcher()) {
286                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ParameterizedStringHandlerWithMethodCondition_"+obf.id+".dot"));
287                                snapshot.setGraphAttribute("dpi", "70");
288                                bus.takeSnapshot(snapshot);
289                        }
290                        
291                        bus.post("Hello");
292                        bus.post("World");
293                        bus.join();
294                        
295                        assertEquals(1, handler.getCounter());
296                        assertTrue(handler.isOk());
297                }
298        }
299        
300        
301        @Test
302        public void testTokenParameterizedStringHandlerWithMethodCondition() throws InterruptedException {
303                for (ObjectBusFactory obf: createObjectBus(null)) {
304                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
305                
306                        Map<String, String> map = new HashMap<String, String>();
307                        map.put("filterStr", "\"Hello\"");
308                        TokenSource ts = new MapTokenSource(map); 
309                        TokenExpander te = new TokenExpander(ts);
310        
311                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, te);                
312                        TokenParameterizedStringHandlerWithMethodCondition handler = new TokenParameterizedStringHandlerWithMethodCondition();
313                                        
314                        introspector.bind(handler, bus);
315                        
316                        if (!obf.isUseSimpleMatcher()) {
317                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\TokenParameterizedStringHandlerWithMethodCondition_"+obf.id+".dot"));
318                                snapshot.setGraphAttribute("dpi", "70");
319                                bus.takeSnapshot(snapshot);
320                        }
321                        
322                        bus.post("Hello");
323                        bus.post("World");
324                        bus.join();
325                        
326                        assertEquals(1, handler.getCounter());
327                        assertTrue(handler.isOk());
328                }
329        }
330        
331        @Test
332        public void testPost() throws InterruptedException {
333                for (ObjectBusFactory obf: createObjectBus(null)) {
334                        
335                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
336                
337                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
338                        StringHandlerWithMethodCondition helloHandler = new StringHandlerWithMethodCondition();                
339                        introspector.bind(helloHandler, bus);
340                        
341                        PostingHandler ph = new PostingHandler();
342                        introspector.bind(ph, bus);
343                        
344                        if (!obf.isUseSimpleMatcher()) {
345                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Posting_"+obf.id+".dot"));
346                                snapshot.setGraphAttribute("dpi", "70");
347                                bus.takeSnapshot(snapshot);
348                        }
349                        
350                        bus.post("World");
351                        bus.join();
352                        
353                        assertEquals(1, ph.getWorldCounter());
354                        assertTrue(ph.isWorldOk());
355                        
356                        assertEquals("OBF: "+obf, 1, ph.getEmCounter());
357                        assertTrue(ph.isEmOk());
358                        
359                        assertEquals(obf+",  ", 1, helloHandler.getCounter());
360                        assertTrue(helloHandler.isOk());
361                }
362 
363        }
364        
365        @Test
366        public void testHandleJoin() throws InterruptedException {
367                for (ObjectBusFactory obf: createObjectBus(null)) {
368                        if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
369                                continue;
370                        }
371                        
372                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
373                
374                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
375                        SlowPostingHandler ph = new SlowPostingHandler();
376                        introspector.bind(ph, bus);
377                        
378                        if (!obf.isUseSimpleMatcher()) {
379                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\HandleJoin_"+obf.id+".dot"));
380                                snapshot.setGraphAttribute("dpi", "70");
381                                bus.takeSnapshot(snapshot);
382                        }
383                        
384                        Handle<Object, Integer, Object, Long> worldHandle = bus.post("World");
385                        worldHandle.join();
386                        
387                        assertEquals(2, ph.getCounter());
388                        
389                        Set<String> words = new HashSet<String>();
390                        words.add("Hello");
391                        words.add("World");
392                        words.add("!");
393                        for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
394                                assertTrue(words.remove(h.getEvent()));
395                        }
396                        assertTrue(words.isEmpty());
397                }
398        }
399        
400        @Test
401        public void testJoin() throws InterruptedException {
402                for (ObjectBusFactory obf: createObjectBus(null)) {
403                        
404                        if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
405                                continue;
406                        }
407                        
408                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
409                
410                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
411                        JoinHandler joinHandler = new JoinHandler();                
412                        introspector.bind(joinHandler, bus);
413                        
414                        if (!obf.isUseSimpleMatcher()) {
415                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Joining_"+obf.id+".dot"));
416                                snapshot.setGraphAttribute("dpi", "70");
417                                bus.takeSnapshot(snapshot);
418                        }
419                        
420                        bus.post("Hello");
421                        bus.post(20);
422                        bus.post(1000);
423                        bus.post("Good bye!");
424                        bus.join();
425                        
426                        // Browser snapshot
427                        if (!obf.useSimpleMatcher) {
428                                SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\Joining_"+obf.id+".snapshot"));
429                                bus.takeSnapshot(emfSnapshot);
430                        }
431                        
432                        assertEquals(1, joinHandler.getHelloCounter());
433                        assertEquals(1, joinHandler.getJoinCounter());
434                        
435                        assertTrue(joinHandler.isHelloOk());
436                        assertTrue(joinHandler.isJoinOk());
437                        
438                        // Derivation test
439                        for (Handle<Object,Integer,Object,Long> h: bus.getStore()) {
440                                Object event = h.getEvent();
441                                if (String.valueOf(event).startsWith("Hello World")) {
442                                        Collection<Derivation<Object, Integer, Object>> derivations = bus.getDerivations(event);
443                                        System.out.println(derivations);
444                                }
445                        }
446                }
447                
448        }
449        
450        @SuppressWarnings("unchecked")
451        @Test
452        public void testBinderCompiler() {
453                JavaBinderCompiler compiler = new JavaBinderCompiler(new File("generated.tests"));
454                
455                compiler.compileJavaBinder(JoinHandler.class, LocalEventBus.class, null, null);                
456        }
457        
458        @Test
459        public void testFamilyTiesBindingCompilation() {
460                JavaBinderCompiler compiler = new JavaBinderCompiler(new File("generated.tests"));
461                
462                compiler.compileJavaBinder(DaughterRule.class, FamilyTiesEventBus.class, null, null);                
463                compiler.compileJavaBinder(GrandRules.class, FamilyTiesEventBus.class, null, null);                
464                compiler.compileJavaBinder(ParentChildRules.class, FamilyTiesEventBus.class, null, null);                
465                compiler.compileJavaBinder(ParentRules.class, FamilyTiesEventBus.class, null, null);                
466                compiler.compileJavaBinder(SecondaryRules.class, FamilyTiesEventBus.class, null, null);                
467                compiler.compileJavaBinder(SiblingRules.class, FamilyTiesEventBus.class, null, null);                
468                compiler.compileJavaBinder(SonRule.class, FamilyTiesEventBus.class, null, null);                
469                compiler.compileJavaBinder(SpouseRules.class, FamilyTiesEventBus.class, null, null);                
470        }
471        
472        @Test
473        public void testCompiledJoin() throws InterruptedException {
474                for (ObjectBusFactory obf: createObjectBus(null)) {
475                        if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
476                                continue;
477                        }
478                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
479                
480                        JoinHandler joinHandler = new JoinHandler();
481                        JoinHandlerJavaBinder joinHandlerJavaBinder = new JoinHandlerJavaBinder();
482                        joinHandlerJavaBinder.bind(joinHandler, bus);
483                        
484                        if (!obf.isUseSimpleMatcher()) {
485                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Joining_"+obf.id+".dot"));
486                                snapshot.setGraphAttribute("dpi", "70");
487                                bus.takeSnapshot(snapshot);
488                        }
489                        
490                        bus.post("Hello");
491                        bus.post(20);
492                        bus.post(1000);
493                        bus.post("Good bye!");
494                        bus.join();
495                        
496                        if (!obf.isUseSimpleMatcher()) {
497                                // Browser snapshot
498                                SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\CompiledJoining"+obf.id+".snapshot"));
499                                bus.takeSnapshot(emfSnapshot);
500                        }
501                        
502                        assertEquals(1, joinHandler.getHelloCounter());
503                        assertEquals(1, joinHandler.getJoinCounter());
504                        
505                        assertTrue(joinHandler.isHelloOk());
506                        assertTrue(joinHandler.isJoinOk());
507                        
508                        // Derivation test
509                        for (Handle<Object,Integer,Object,Long> h: bus.getStore()) {
510                                Object event = h.getEvent();
511                                if (String.valueOf(event).startsWith("Hello World")) {
512                                        Collection<Derivation<Object, Integer, Object>> derivations = bus.getDerivations(event);
513                                        System.out.println(derivations);
514                                }
515                        }
516                }                
517        }
518        
519        @Test
520        public void testJoinWithCost() throws InterruptedException {
521                for (ObjectBusFactory obf: createObjectBus(null)) {
522                        if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
523                                continue;
524                        }
525                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
526                
527                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
528                        JoinHandlerWithCost joinHandler = new JoinHandlerWithCost();                
529                        introspector.bind(joinHandler, bus);
530                        
531                        if (!obf.isUseSimpleMatcher()) {
532                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\JoiningWithCost_"+obf.id+".dot"));
533                                snapshot.setGraphAttribute("dpi", "70");
534                                bus.takeSnapshot(snapshot);
535                        }
536                        
537                        bus.post("Hello");
538                        bus.post(20);
539                        bus.post(1000);
540                        bus.post("World");
541                        bus.join();
542                        
543                        assertEquals(1, joinHandler.getJoinCounter());
544                        assertTrue(joinHandler.isJoinOk());
545                }
546        }
547        
548        @Test
549        public void testParameterNamesJoin() throws InterruptedException {
550                for (ObjectBusFactory obf: createObjectBus(null)) {
551                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
552                
553                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
554                        ParameterNamesJoinHandler joinHandler = new ParameterNamesJoinHandler();                
555                        introspector.bind(joinHandler, bus);
556                        
557                        if (!obf.isUseSimpleMatcher()) {
558                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ParameterNamesJoining_"+obf.id+".dot"));
559                                snapshot.setGraphAttribute("dpi", "70");
560                                bus.takeSnapshot(snapshot);
561                        }
562                        
563                        bus.post("Hello");
564                        bus.post(20);
565                        bus.post(1000);
566                        bus.post("Good bye!");
567                        bus.join();
568                        
569                        assertEquals(1, joinHandler.getHelloCounter());
570                        assertEquals(1, joinHandler.getJoinCounter());
571                        
572                        assertTrue(joinHandler.isHelloOk());
573                        assertTrue(joinHandler.isJoinOk());
574                }
575                
576        }
577        
578        @Test
579        public void testPriority() throws InterruptedException {
580                for (ObjectBusFactory obf: createObjectBus(null)) {
581                        if (obf.inferencePolicy.compareTo(InferencePolicy.AFTER_HANDLER)>=0) {
582                                continue; // Priorities work only if inference commands are processed after handler or immediately.
583                        }
584                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
585                
586                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
587                        PriorityHandler priorityHandler = new PriorityHandler();                
588                        introspector.bind(priorityHandler, bus);
589                        
590                        bus.post("World");
591                        bus.post("Hello");
592                        bus.join();
593                        
594                        assertEquals(obf+", ", 1, priorityHandler.getLpCounter());
595                        assertEquals(obf+", ", 2, priorityHandler.getHpCounter());
596                        assertEquals(obf+", ", 2, priorityHandler.getAhpCounter());
597                }
598        }
599        
600        @Test
601        public void testOneOff() throws InterruptedException {
602                for (ObjectBusFactory obf: createObjectBus(null)) {
603                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
604                
605                        final AtomicInteger counter = new AtomicInteger();
606        
607                        LocalAbstractEventHandler<Object, Integer, Object> eh = new LocalAbstractEventHandler<Object, Integer, Object>(1, 0, null, false, true, Mode.POST) {
608        
609        
610                                @Override
611                                public void post(
612                                                EventDispatchContext<Object, Integer, Object, Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> context,
613                                                Object... events) {
614                                        
615                                        counter.incrementAndGet();                                
616                                }
617                        };
618                        
619                        bus.addHandler(eh);
620                        
621                        bus.post("World");
622                        bus.post("Hello");
623                        bus.post("!");
624                        bus.join();
625                        
626                        assertEquals(1, counter.get());
627                }
628        }
629        
630        @Test
631        public void testConsumeJoin() throws InterruptedException {
632                for (ObjectBusFactory obf: createObjectBus(null)) {
633                        if (!obf.pkStore) {
634                                continue;
635                        }
636                        
637                        if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
638                                continue;
639                        }
640                        
641                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
642                
643                        OrderFulfiller orderFulfiller = new OrderFulfiller();
644                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
645                        introspector.bind(orderFulfiller, bus);
646        
647                        if (!obf.isUseSimpleMatcher()) {
648                                LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ConsumeJoin_"+obf.id+".dot"));
649                                snapshot.setGraphAttribute("dpi", "70");
650                                bus.takeSnapshot(snapshot);
651                        }
652                        
653                        int totalNumberOfDishes = 800;                
654                        Kitchen kitchen = new Kitchen(totalNumberOfDishes, bus);
655                        
656                        int totalNumberOfOrders = 500;
657                        Cashier cashier = new Cashier(totalNumberOfOrders, bus);
658                        
659                        kitchen.start();
660                        cashier.start();
661        
662                        kitchen.join();
663                        cashier.join();
664                        bus.join();
665                        Thread.sleep(5000);
666                        
667                        assertTrue(orderFulfiller.isOK());
668                        
669                        int fulfilledOrders = 0;
670                        for (Order o: orderFulfiller.getFulfilledOrders()) {
671                                assertTrue(o.isFulfilled());
672                                ++fulfilledOrders;
673                        }
674                        
675                        int partialOrderDishes = 0;
676                        int unfulfilledOrders = 0;
677                        Predicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> orderSelector = new InstanceOfPredicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>>(bus.getStore().getPrimaryKeyExtractor(), Order.class);
678                        for (Object event: bus.getStore().get(orderSelector, bus.getStore().getPrimaryKeyExtractor(), false, null)) {
679                                Order order = (Order) event;
680                                assertNotNull(order);
681                                assertFalse(order.isFulfilled());
682                                if (order.getMainDish()!=null) {
683                                        ++partialOrderDishes;
684                                }
685                                if (order.getSideDish()!=null) {
686                                        ++partialOrderDishes;
687                                }
688                                assertFalse(orderFulfiller.getFulfilledOrders().contains(order));
689                                ++unfulfilledOrders;
690                        }
691                        assertEquals(totalNumberOfOrders, fulfilledOrders+unfulfilledOrders);
692                        
693                        int remainingDishes = 0;
694                        Predicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> dishSelector = new InstanceOfPredicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>>(bus.getStore().getPrimaryKeyExtractor(), Dish.class);
695                        for (Handle<Object, Integer, Object, Long> h: bus.getStore().get(dishSelector)) {
696                                Dish dish = (Dish) h.getEvent();
697                                assertNotNull(dish);
698                                assertFalse(dish.isConsumed());
699                                ++remainingDishes;
700                        }
701                        assertEquals(totalNumberOfDishes, fulfilledOrders*2+remainingDishes+partialOrderDishes);
702                }                
703        }
704 
705        @Test
706        public void testPredicateChaining() throws InterruptedException {
707                for (StringBusFactory obf: createStringBus(null)) {
708                        LocalEventBus<String, Integer, Object> bus = obf.createBus();
709                
710                        LocalIntrospector<String, Object> introspector = new LocalIntrospector<String, Object>(null, null);                
711                        PredicateChainingHandler handler = new PredicateChainingHandler();
712                        introspector.bind(handler, bus);
713                        
714                        if (!obf.isUseSimpleMatcher()) {
715                                bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<String, Integer, Object>(new File("snapshots\\PredicateChainingHandler_"+obf.id+".dot")));
716                        }
717                        
718                        bus.post("Hello world!");
719                        bus.join();
720                        
721                        assertTrue(handler.isOneInvoked());
722                        assertTrue(handler.isTwoInvoked());
723                }
724        }
725        
726        @Test
727        public void testOppositeChaining() throws InterruptedException {
728                for (ObjectBusFactory obf: createObjectBus(null)) {
729                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
730                
731                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
732                        OppositeHandler handler = new OppositeHandler();
733                        introspector.bind(handler, bus);
734                        
735                        if (!obf.isUseSimpleMatcher()) {
736                                bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\OppositeHandler_"+obf.id+".dot")));
737                        }
738                        
739                        bus.post("Hello world!");
740                        bus.join();
741                        
742                        assertFalse(handler.isOneInvoked());
743                        assertTrue(handler.isTwoInvoked());
744                }
745        }
746        
747        @Test
748        public void testMoreRestrictive() throws InterruptedException {
749                for (ObjectBusFactory obf: createObjectBus(null)) {
750                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
751                
752                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
753                        MoreRestrictiveHandler handler = new MoreRestrictiveHandler();
754                        introspector.bind(handler, bus);
755                        
756                        if (!obf.isUseSimpleMatcher()) {
757                                bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\MoreRestrictiveHandler_"+obf.id+".dot")));
758                        }
759                        
760                        bus.post(Collections.singleton("Hello"));
761                        bus.join();
762                        
763                        assertTrue(handler.isCollectionInvoked());
764                        assertFalse(handler.isListInvoked());
765                }
766        }
767        
768        @Test
769        public void testLoop() throws InterruptedException {
770                DuplicatesFilter iFilter = new DuplicatesFilter();
771                for (ObjectBusFactory obf: createObjectBus(iFilter)) {
772                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
773                        
774                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
775                        LoopHandler handler = new LoopHandler();
776                        introspector.bind(handler, bus);
777                        
778                        if (!obf.isUseSimpleMatcher()) {
779                                bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\LoopHandler_"+obf.id+".dot")));
780                        }
781                        
782                        bus.post("Hello");
783                        bus.post("Good bye!");
784                        bus.join();
785                        
786                        assertTrue(handler.isHelloOk());
787                        assertEquals(1, handler.getHelloCounter());
788                        
789                        assertTrue(handler.isWorldOk());
790                        assertEquals(1, handler.getWorldCounter());
791                }
792        }
793        
794        @Test
795        public void testInifinite() throws InterruptedException {
796                InferenceFilter iFilter = new InferenceChainLengthFilter(100);
797                for (ObjectBusFactory obf: createObjectBus(iFilter)) {
798                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
799                
800                        bus.setMaxDerivationDepth(100);
801                        
802                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
803                        InfiniteHandler handler = new InfiniteHandler();
804                        introspector.bind(handler, bus);
805                        
806                        if (!obf.isUseSimpleMatcher()) {
807                                bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\InfiniteHandler_"+obf.id+".dot")));
808                        }
809                        
810                        Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
811                        handle.join();
812                }
813        }
814        
815        @Test(expected=EventDispatchException.class)
816        public void testFaulty() throws InterruptedException {
817                for (ObjectBusFactory obf: createObjectBus(null)) {
818                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
819                
820                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
821                        FaultyHandler handler = new FaultyHandler();
822                        introspector.bind(handler, bus);
823                        
824                        Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
825                        handle.join();
826                }
827        }
828 
829        @Test
830        public void testRemoveHandler() throws InterruptedException {
831                for (ObjectBusFactory obf: createObjectBus(null)) {
832                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
833                
834                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
835                        SimpleStringHandler handler = new SimpleStringHandler();
836                        Set<Long> keys = introspector.bind(handler, bus);
837                        
838                        Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
839                        handle.join();
840                        assertEquals(1, handler.getCounter());
841                        
842                        bus.removeHandlers(keys);
843                        
844                        SimpleStringHandler handler2 = new SimpleStringHandler();
845                        introspector.bind(handler2, bus);
846                        
847                        Handle<Object, Integer, Object, Long> handle2 = bus.post("Good bye!");
848                        handle2.join();
849                        assertEquals(1, handler.getCounter());
850                        assertEquals(1, handler2.getCounter());
851                }                
852        }
853        
854        @Test
855        public void testReset() throws InterruptedException {
856                for (ObjectBusFactory obf: createObjectBus(null)) {
857                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
858                
859                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);                
860                        StatefulHandler handler = new StatefulHandler();
861                        Set<Long> keys = introspector.bind(handler, bus);
862                        
863                        bus.post("Hello");
864                        bus.post("1");
865                        bus.join();
866                        
867                        assertEquals(2, handler.getState().size());
868                        int kbSize = 0;
869                        for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
870                                ++kbSize;
871                        }
872                        assertEquals(2, kbSize);
873                        assertEquals(0, handler.getResetCounter());
874                        
875                        bus.reset();
876                        
877                        assertEquals(0, handler.getState().size());
878                        kbSize = 0;
879                        for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
880                                ++kbSize;
881                        }
882                        assertEquals(0, kbSize);
883                        assertEquals(2, handler.getResetCounter());
884                }                                
885        }
886        
887        private static int predicateCounter;
888        
889        /**
890         * Helper interface for intantiate bus for tests.
891         * @author Pavel Vlasov
892         *
893         * @param <E>
894         * @param <P>
895         * @param <C>
896         */
897        private class ObjectBusFactory {
898                
899                private InferencePolicy inferencePolicy;
900                private boolean pkStore;
901                private boolean useExecutor;
902                private InferenceFilter iFilter;
903                private int id;
904                private boolean useSimpleMatcher;
905                private ObservableConverter<Object> observableConverter;
906 
907                public ObjectBusFactory(InferencePolicy inferencePolicy, InferenceFilter iFilter, boolean pkStore, boolean useExecutor, int id, boolean useSimpleMatcher) {
908                        this.inferencePolicy = inferencePolicy;
909                        this.pkStore = pkStore;
910                        this.useExecutor = useExecutor;
911                        this.iFilter = iFilter;
912                        this.id = id;
913                        this.useSimpleMatcher = useSimpleMatcher;
914                }
915                
916                public boolean isUseExecutor() {
917                        return useExecutor;
918                }
919                
920                public boolean isUseSimpleMatcher() {
921                        return useSimpleMatcher;
922                }
923                
924                public InferencePolicy getInferencePolicy() {
925                        return inferencePolicy;
926                }
927                
928                @Override
929                public String toString() {
930                        return "ObjectBusFactory [inferencePolicy=" + inferencePolicy
931                                        + ", pkStore=" + pkStore + ", useExecutor=" + useExecutor
932                                        + ", iFilter=" + iFilter + ", id=" + id + ", useSimpleMatcher="+useSimpleMatcher+"]";
933                }
934                
935                public void setObservableConverter(ObservableConverter<Object> observableConverter) {
936                        this.observableConverter = observableConverter;
937                }
938 
939                LocalEventBus<Object, Integer, Object> createBus() {
940                        LocalEventStoreImpl.Config<Object, Integer, Object> storeConfig = new LocalEventStoreImpl.Config<Object, Integer, Object>();
941                        if (!pkStore) {
942                                storeConfig.setPrimaryKeyExtractor(null);
943                        }
944                        if (useExecutor) {
945                                storeConfig.setExecutorService(executorService);
946                        }
947                        LocalEventStore<Object, Integer, Object> eventStore = new LocalEventStoreImpl<Object, Integer, Object>(storeConfig);
948                        
949                        LocalEventBus.Config<Object, Integer, Object> busConfig = new LocalEventBus.Config<Object, Integer, Object>();
950                        busConfig.setEventType(Object.class);
951                        busConfig.setStore(eventStore);
952                        if (useExecutor) {
953                                busConfig.setExecutorService(executorService);
954                        }
955                        busConfig.setInferencePolicy(inferencePolicy);
956                        busConfig.setInferenceFilter(iFilter);
957                        if (useSimpleMatcher) {
958                                busConfig.setMatcher(new LocalSimpleMatcher<Object, Integer, Object, LocalEventStore<Object,Integer,Object>>());
959                        }
960                        
961                        busConfig.setObservableConverter(observableConverter);
962                        
963                        return new LocalEventBus<Object, Integer, Object>(busConfig) {
964                                
965//                                /**
966//                                 * For troubleshooting.
967//                                 */
968//                                public Long addHandler(
969//                                                EventHandler<Object,Integer,Object,AbstractEventBus.Handle<Object,Integer,Object,Long>,
970//                                                LocalEventStore<Object,Integer,Object>> eventHandler, 
971//                                                boolean oneOff, 
972//                                                Predicate<Object,Object>... predicates) {
973        //
974//                                        String timestamp = Long.toString(System.currentTimeMillis(), Character.MAX_RADIX);
975//                                        for (int i=0; i<predicates.length; ++i) {
976//                                                PredicateOutput.output(predicates[i], new File("snapshots\\Predicate-"+(++i)+"-"+timestamp+"-"+i+".snapshot"));                        
977//                                        }
978//                                        
979//                                        return super.addHandler(eventHandler, oneOff, predicates);
980//                                }
981 
982                        };
983                }
984                
985        }
986        /**
987         * Helper interface for instantiate bus for tests.
988         * @author Pavel Vlasov
989         *
990         * @param <E>
991         * @param <P>
992         * @param <C>
993         */
994        private class StringBusFactory {
995                
996                private InferencePolicy inferencePolicy;
997                private boolean pkStore;
998                private boolean useExecutor;
999                private InferenceFilter iFilter;
1000                private int id;
1001                private boolean useSimpleMatcher;
1002 
1003                public StringBusFactory(InferencePolicy inferencePolicy, InferenceFilter iFilter, boolean pkStore, boolean useExecutor, int id, boolean useSimpleMatcher) {
1004                        this.inferencePolicy = inferencePolicy;
1005                        this.pkStore = pkStore;
1006                        this.useExecutor = useExecutor;
1007                        this.iFilter = iFilter;
1008                        this.id = id;
1009                        this.useSimpleMatcher = useSimpleMatcher;
1010                }
1011                
1012                public boolean isUseExecutor() {
1013                        return useExecutor;
1014                }
1015                
1016                public boolean isUseSimpleMatcher() {
1017                        return useSimpleMatcher;
1018                }
1019                
1020                LocalEventBus<String, Integer, Object> createBus() {
1021                        LocalEventStoreImpl.Config<String, Integer, Object> storeConfig = new LocalEventStoreImpl.Config<String, Integer, Object>();
1022                        if (!pkStore) {
1023                                storeConfig.setPrimaryKeyExtractor(null);
1024                        }
1025                        if (useExecutor) {
1026                                storeConfig.setExecutorService(executorService);
1027                        }
1028                        LocalEventStore<String, Integer, Object> eventStore = new LocalEventStoreImpl<String, Integer, Object>(storeConfig);
1029                        
1030                        LocalEventBus.Config<String, Integer, Object> busConfig = new LocalEventBus.Config<String, Integer, Object>();
1031                        busConfig.setEventType(String.class);
1032                        busConfig.setStore(eventStore);
1033                        if (useExecutor) {
1034                                busConfig.setExecutorService(executorService);
1035                        }
1036                        busConfig.setInferencePolicy(inferencePolicy);
1037                        busConfig.setInferenceFilter(iFilter);
1038                        if (useSimpleMatcher) {
1039                                busConfig.setMatcher(new LocalSimpleMatcher<String, Integer, Object, LocalEventStore<String,Integer,Object>>());
1040                        }
1041                        
1042                        return new LocalEventBus<String, Integer, Object>(busConfig) {
1043                                
1044//                                /**
1045//                                 * For troubleshooting.
1046//                                 */
1047//                                public Long addHandler(
1048//                                                EventHandler<Object,Integer,Object,AbstractEventBus.Handle<Object,Integer,Object,Long>,
1049//                                                LocalEventStore<Object,Integer,Object>> eventHandler, 
1050//                                                boolean oneOff, 
1051//                                                Predicate<Object,Object>... predicates) {
1052        //
1053//                                        String timestamp = Long.toString(System.currentTimeMillis(), Character.MAX_RADIX);
1054//                                        for (int i=0; i<predicates.length; ++i) {
1055//                                                PredicateOutput.output(predicates[i], new File("snapshots\\Predicate-"+(++i)+"-"+timestamp+"-"+i+".snapshot"));                        
1056//                                        }
1057//                                        
1058//                                        return super.addHandler(eventHandler, oneOff, predicates);
1059//                                }
1060 
1061                        };
1062                }
1063                
1064        }
1065 
1066        private Iterable<ObjectBusFactory> createObjectBus(InferenceFilter iFilter) {
1067                int counter = 0;
1068                ArrayList<ObjectBusFactory> ret = new ArrayList<ObjectBusFactory>();
1069                for (InferencePolicy ip: InferencePolicy.values()) {
1070                        ret.add(new ObjectBusFactory(ip, iFilter, false, false, ++counter, true));
1071                        ret.add(new ObjectBusFactory(ip, iFilter, false, true, ++counter, true));
1072                        ret.add(new ObjectBusFactory(ip, iFilter, true, false, ++counter, true));
1073                        ret.add(new ObjectBusFactory(ip, iFilter, true, true, ++counter, true));
1074                        
1075                        ret.add(new ObjectBusFactory(ip, iFilter, false, false, ++counter, false));
1076                        ret.add(new ObjectBusFactory(ip, iFilter, false, true, ++counter, false));
1077                        ret.add(new ObjectBusFactory(ip, iFilter, true, false, ++counter, false));
1078                        ret.add(new ObjectBusFactory(ip, iFilter, true, true, ++counter, false));
1079                }
1080                return ret;
1081        }
1082 
1083        private Iterable<StringBusFactory> createStringBus(InferenceFilter iFilter) {
1084                int counter = 0;
1085                ArrayList<StringBusFactory> ret = new ArrayList<StringBusFactory>();
1086                for (InferencePolicy ip: InferencePolicy.values()) {
1087                        ret.add(new StringBusFactory(ip, iFilter, false, false, ++counter, true));
1088                        ret.add(new StringBusFactory(ip, iFilter, false, true, ++counter, true));
1089                        ret.add(new StringBusFactory(ip, iFilter, true, false, ++counter, true));
1090                        ret.add(new StringBusFactory(ip, iFilter, true, true, ++counter, true));
1091                        
1092                        ret.add(new StringBusFactory(ip, iFilter, false, false, ++counter, false));
1093                        ret.add(new StringBusFactory(ip, iFilter, false, true, ++counter, false));
1094                        ret.add(new StringBusFactory(ip, iFilter, true, false, ++counter, false));
1095                        ret.add(new StringBusFactory(ip, iFilter, true, true, ++counter, false));
1096                }
1097                return ret;
1098        }
1099        
1100        @Test
1101        public void testFamilyTies() throws InterruptedException {
1102                int counter = 0;
1103                for (InferencePolicy ip: InferencePolicy.values()) {
1104                        if (ip==InferencePolicy.IMMEDIATELY) {
1105                                continue;
1106                        }
1107                        
1108                        _testFamilyTies(ip, false, false, ++counter, true);
1109                        _testFamilyTies(ip, false, true, ++counter, true);
1110                        _testFamilyTies(ip, true, false, ++counter, true);
1111                        _testFamilyTies(ip, true, true, ++counter, true);
1112                        
1113                        _testFamilyTies(ip, false, false, ++counter, false);
1114                        _testFamilyTies(ip, false, true, ++counter, false);
1115                        _testFamilyTies(ip, true, false, ++counter, false);
1116                        _testFamilyTies(ip, true, true, ++counter, false);                        
1117                }
1118        }
1119        
1120        private void _testFamilyTies(InferencePolicy iPolicy, boolean useExecutor, boolean pkStore, int id, boolean useSimpleMatcher) throws InterruptedException {                
1121                // Create bus
1122                FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1123                if (useExecutor) {
1124                        storeConfig.setExecutorService(executorService);
1125                }
1126                if (!pkStore) {
1127                        storeConfig.setPrimaryKeyExtractor(null);
1128                }
1129                FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1130                
1131                FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1132                busConfig.setStore(eventStore);
1133                if (useExecutor) {
1134                        busConfig.setExecutorService(executorService);
1135                }
1136                if (useSimpleMatcher) {
1137                        busConfig.setMatcher(new LocalSimpleMatcher<Relative, Integer, FamilyTiesRules, FamilyTiesEventStore>());
1138                }
1139                
1140                busConfig.setInferencePolicy(iPolicy);
1141                busConfig.setInferenceFilter(new DuplicatesFilter());
1142                
1143                FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1144                
1145                FamilyTiesIntrospector introspector = new FamilyTiesIntrospector(null, null);
1146                
1147                // Bind rules.
1148                introspector.bind(new DaughterRule(), bus);
1149                introspector.bind(new GrandRules(), bus);
1150                introspector.bind(new ParentChildRules(), bus);
1151                introspector.bind(new ParentRules(), bus);
1152                introspector.bind(new SecondaryRules(), bus);
1153                introspector.bind(new SiblingRules(), bus);
1154                introspector.bind(new SonRule(), bus);
1155                introspector.bind(new SpouseRules(), bus);
1156                
1157                if (!useSimpleMatcher) {
1158                        // Take dot snapshot
1159                        bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\FamilyTies_"+id+".dot")));
1160                }
1161 
1162                // Post seed relationships
1163                Person kate = new Person("Kate", 58, false);
1164                Person victor = new Person("Victor", 63, true);
1165                bus.post(new Spouse(kate, victor));
1166 
1167                Person peter = new Person("Peter", 37, true);
1168                bus.post(new Child(peter, kate));
1169                bus.post(new Child(peter, victor));
1170                
1171                Person alison = new Person("Alison", 36, false);
1172                bus.post(new Spouse(peter, alison));
1173 
1174                Person lucy = new Person("Lucy", 17, false);
1175                bus.post(new Child(lucy, alison));
1176                
1177                Person nancy = new Person("Nancy", 14, false);
1178                bus.post(new Child(nancy, peter));
1179                
1180                Person dan = new Person("Dan", 7, true);
1181                bus.post(new Child(dan, peter));
1182                bus.post(new Child(dan, alison));
1183                
1184                Person audrey = new Person("Audrey", 4, false);
1185                bus.post(new Child(audrey, peter));
1186                bus.post(new Child(audrey, alison));                
1187                
1188                Person tanya = new Person("Tanya", 31, false);
1189                Person max = new Person("Max", 32, true);
1190                bus.post(new Spouse(tanya, max));
1191                bus.post(new Child(tanya, kate));
1192                bus.post(new Child(tanya, victor));
1193 
1194                Person vilma = new Person("Vilma", 14, false);
1195                bus.post(new Child(vilma, tanya));
1196                
1197                Person george = new Person("George", 10, true);
1198                bus.post(new Child(george, tanya));
1199                
1200                Person lisa = new Person("Lisa", 5, false);
1201                bus.post(new Child(lisa, tanya));
1202                bus.post(new Child(lisa, max));                
1203                                
1204                bus.join();
1205                
1206                // Take browser snapshot
1207                if (!useSimpleMatcher) {
1208                        SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\FamilyTies_"+id+".snapshot"));
1209                        bus.takeSnapshot(emfSnapshot);
1210                }
1211                
1212                int numberOfLisaRelatives = 0;
1213                for (Relative r: bus.getStore().getRelatives(lisa)) {
1214                        ++numberOfLisaRelatives;
1215                }
1216                assertEquals(11, numberOfLisaRelatives);
1217                
1218                for (Relative relative: bus.getStore().getRelatives(lisa)) {
1219                        System.out.println(relative);
1220                }
1221        }
1222        
1223        @Test
1224        public void testCompiledFamilyTies() throws InterruptedException {
1225                int counter = 0;
1226                for (InferencePolicy ip: InferencePolicy.values()) {
1227                        if (ip==InferencePolicy.IMMEDIATELY) {
1228                                continue;
1229                        }
1230                        _testCompiledFamilyTies(ip, false, false, ++counter, true);
1231                        _testCompiledFamilyTies(ip, false, true, ++counter, true);
1232                        _testCompiledFamilyTies(ip, true, false, ++counter, true);
1233                        _testCompiledFamilyTies(ip, true, true, ++counter, true);
1234 
1235                        _testCompiledFamilyTies(ip, false, false, ++counter, false);
1236                        _testCompiledFamilyTies(ip, false, true, ++counter, false);
1237                        _testCompiledFamilyTies(ip, true, false, ++counter, false);
1238                        _testCompiledFamilyTies(ip, true, true, ++counter, false);
1239                }
1240        }
1241        
1242        private void _testCompiledFamilyTies(InferencePolicy iPolicy, boolean useExecutor, boolean pkStore, int id, boolean useSimpleMatcher) throws InterruptedException {                
1243                // Create bus
1244                // Create bus
1245                FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1246                if (useExecutor) {
1247                        storeConfig.setExecutorService(executorService);
1248                }
1249                if (!pkStore) {
1250                        storeConfig.setPrimaryKeyExtractor(null);
1251                }
1252                FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1253                
1254                FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1255                busConfig.setStore(eventStore);
1256                if (useExecutor) {
1257                        busConfig.setExecutorService(executorService);
1258                }
1259                busConfig.setInferencePolicy(iPolicy);
1260                busConfig.setInferenceFilter(new DuplicatesFilter());
1261                
1262                if (useSimpleMatcher) {
1263                        busConfig.setMatcher(new LocalSimpleMatcher<Relative, Integer, FamilyTiesRules, FamilyTiesEventStore>());
1264                }                
1265                
1266                FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1267                
1268                // Bind rules.
1269                new DaughterRuleJavaBinder().bind(new DaughterRule(), bus);
1270                new GrandRulesJavaBinder().bind(new GrandRules(), bus);
1271                new ParentChildRulesJavaBinder().bind(new ParentChildRules(), bus);
1272                new ParentRulesJavaBinder().bind(new ParentRules(), bus);
1273                new SecondaryRulesJavaBinder().bind(new SecondaryRules(), bus);
1274                new SiblingRulesJavaBinder().bind(new SiblingRules(), bus);
1275                new SonRuleJavaBinder().bind(new SonRule(), bus);
1276                new SpouseRulesJavaBinder().bind(new SpouseRules(), bus);
1277                
1278                if (!useSimpleMatcher) {
1279                        // Take dot snapshot
1280                        bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\CompiledFamilyTies_"+id+".dot")));
1281                }
1282 
1283                // Post seed relationships
1284                Person kate = new Person("Kate", 58, false);
1285                Person victor = new Person("Victor", 63, true);
1286                bus.post(new Spouse(kate, victor));
1287 
1288                Person peter = new Person("Peter", 37, true);
1289                bus.post(new Child(peter, kate));
1290                bus.post(new Child(peter, victor));
1291                
1292                Person alison = new Person("Alison", 36, false);
1293                bus.post(new Spouse(peter, alison));
1294 
1295                Person lucy = new Person("Lucy", 17, false);
1296                bus.post(new Child(lucy, alison));
1297                
1298                Person nancy = new Person("Nancy", 14, false);
1299                bus.post(new Child(nancy, peter));
1300                
1301                Person dan = new Person("Dan", 7, true);
1302                bus.post(new Child(dan, peter));
1303                bus.post(new Child(dan, alison));
1304                
1305                Person audrey = new Person("Audrey", 4, false);
1306                bus.post(new Child(audrey, peter));
1307                bus.post(new Child(audrey, alison));                
1308                
1309                Person tanya = new Person("Tanya", 31, false);
1310                Person max = new Person("Max", 32, true);
1311                bus.post(new Spouse(tanya, max));
1312                bus.post(new Child(tanya, kate));
1313                bus.post(new Child(tanya, victor));
1314 
1315                Person vilma = new Person("Vilma", 14, false);
1316                bus.post(new Child(vilma, tanya));
1317                
1318                Person george = new Person("George", 10, true);
1319                bus.post(new Child(george, tanya));
1320                
1321                Person lisa = new Person("Lisa", 5, false);
1322                bus.post(new Child(lisa, tanya));
1323                bus.post(new Child(lisa, max));                
1324                                
1325                bus.join();
1326                
1327                if (!useSimpleMatcher) {
1328                        // Take browser snapshot
1329                        SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\CompiledFamilyTies.snapshot"));
1330                        bus.takeSnapshot(emfSnapshot);
1331                }
1332                
1333                int numberOfLisaRelatives = 0;
1334                for (Relative r: bus.getStore().getRelatives(lisa)) {
1335                        ++numberOfLisaRelatives;
1336                }
1337                assertEquals(11, numberOfLisaRelatives);
1338                
1339                for (Relative relative: bus.getStore().getRelatives(lisa)) {
1340                        System.out.println(relative);
1341                }
1342        }
1343        
1344        @Test
1345        public void testFamilyTiesJmx() throws InterruptedException {        
1346                
1347                // Create bus
1348                FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1349                storeConfig.setExecutorService(executorService);
1350                FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1351                
1352                FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1353                busConfig.setStore(eventStore);
1354                busConfig.setExecutorService(executorService);
1355                busConfig.setInferenceFilter(new DuplicatesFilter());
1356                
1357                JmxStatsCollector jsc = new JmxStatsCollector(ManagementFactory.getPlatformMBeanServer(), "Hammurapi Group:root=Family Ties"); 
1358                busConfig.setStatsCollector(jsc);
1359                
1360                FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1361                
1362                Thread.sleep(40000);
1363                
1364                FamilyTiesIntrospector introspector = new FamilyTiesIntrospector(null, null);
1365                
1366                // Bind rules.
1367                introspector.bind(new DaughterRule(), bus);
1368                introspector.bind(new GrandRules(), bus);
1369                introspector.bind(new ParentChildRules(), bus);
1370                introspector.bind(new ParentRules(), bus);
1371                introspector.bind(new SecondaryRules(), bus);
1372                introspector.bind(new SiblingRules(), bus);
1373                introspector.bind(new SonRule(), bus);
1374                introspector.bind(new SpouseRules(), bus);
1375                
1376                // Take dot snapshot
1377                bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\FamilyTies_Jmx.dot")));                
1378 
1379                // Post seed relationships
1380                Person kate = new Person("Kate", 58, false);
1381                Person victor = new Person("Victor", 63, true);
1382                bus.post(new Spouse(kate, victor));
1383                Thread.sleep(2000);
1384 
1385                Person peter = new Person("Peter", 37, true);
1386                bus.post(new Child(peter, kate));
1387                Thread.sleep(2000);
1388                bus.post(new Child(peter, victor));
1389                Thread.sleep(2000);
1390                
1391                Person alison = new Person("Alison", 36, false);
1392                bus.post(new Spouse(peter, alison));
1393                Thread.sleep(2000);
1394 
1395                Person lucy = new Person("Lucy", 17, false);
1396                bus.post(new Child(lucy, alison));
1397                Thread.sleep(2000);
1398                
1399                Person nancy = new Person("Nancy", 14, false);
1400                bus.post(new Child(nancy, peter));
1401                Thread.sleep(2000);
1402                
1403                Person dan = new Person("Dan", 7, true);
1404                bus.post(new Child(dan, peter));
1405                Thread.sleep(2000);
1406                bus.post(new Child(dan, alison));
1407                Thread.sleep(2000);
1408                
1409                Person audrey = new Person("Audrey", 4, false);
1410                bus.post(new Child(audrey, peter));
1411                Thread.sleep(2000);
1412                bus.post(new Child(audrey, alison));                
1413                Thread.sleep(2000);
1414                
1415                Person tanya = new Person("Tanya", 31, false);
1416                Person max = new Person("Max", 32, true);
1417                bus.post(new Spouse(tanya, max));
1418                Thread.sleep(2000);
1419                bus.post(new Child(tanya, kate));
1420                Thread.sleep(2000);
1421                bus.post(new Child(tanya, victor));
1422                Thread.sleep(2000);
1423 
1424                Person vilma = new Person("Vilma", 14, false);
1425                bus.post(new Child(vilma, tanya));
1426                Thread.sleep(2000);
1427                
1428                Person george = new Person("George", 10, true);
1429                bus.post(new Child(george, tanya));
1430                Thread.sleep(2000);
1431                
1432                Person lisa = new Person("Lisa", 5, false);
1433                bus.post(new Child(lisa, tanya));
1434                Thread.sleep(2000);
1435                bus.post(new Child(lisa, max));                
1436                Thread.sleep(2000);
1437                                
1438                Thread.sleep(30000);
1439                
1440                bus.join();
1441                
1442        }
1443        
1444        // --- Remove tests ---
1445        
1446        /**
1447         * - Post event to the bus
1448         * - Check presence
1449         * - Remove event using bus.remove()
1450         * - Check that event is gone from the store.
1451         */
1452        @Test
1453        public void testRemoveWithBus() throws Exception {
1454                for (ObjectBusFactory obf: createObjectBus(null)) {
1455                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1456                        
1457                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1458                        
1459                        PostingHandler ph = new PostingHandler();
1460                        introspector.bind(ph, bus);
1461                                                
1462                        bus.post("World");
1463                        bus.join();
1464                        
1465                        assertEquals(1, ph.getWorldCounter());
1466                        assertTrue(ph.isWorldOk());
1467                        
1468                        assertEquals(1, ph.getEmCounter());
1469                        assertTrue(ph.isEmOk());
1470                        
1471                        Set<String> expected = new HashSet<String>();
1472                        expected.add("Hello");
1473                        expected.add("World");
1474                        expected.add("!");
1475                                                                        
1476                        for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1477                                if (!expected.remove(h.getEvent())) {
1478                                        fail("Unexpected object in the store: "+h.getEvent());
1479                                }
1480                        }
1481                        
1482                        if (!expected.isEmpty()) {
1483                                fail("Not present in the store: "+expected);
1484                        }
1485                        
1486                        bus.remove("World");
1487                        bus.join();
1488                        
1489                        for (Object o: bus.getStore()) {
1490                                fail("Unexpected object in the store: "+o);
1491                        }                        
1492                }
1493        }
1494        
1495        /**
1496         * - Post event to the bus
1497         * - Check presence
1498         * - Remove event using Handle.remove()
1499         * - Check that event is gone from the store.
1500         */
1501        @Test
1502        public void testRemoveWithHandle() throws Exception {
1503                for (ObjectBusFactory obf: createObjectBus(null)) {
1504                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1505                        
1506                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1507                        
1508                        PostingHandler ph = new PostingHandler();
1509                        introspector.bind(ph, bus);
1510                                                
1511                        Handle<Object, Integer, Object, Long> wHandle = bus.post("World");
1512                        bus.join();
1513                        
1514                        assertEquals(1, ph.getWorldCounter());
1515                        assertTrue(ph.isWorldOk());
1516                        
1517                        assertEquals(1, ph.getEmCounter());
1518                        assertTrue(ph.isEmOk());
1519                        
1520                        Set<String> expected = new HashSet<String>();
1521                        expected.add("Hello");
1522                        expected.add("World");
1523                        expected.add("!");
1524                                                
1525                        for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1526                                if (!expected.remove(h.getEvent())) {
1527                                        fail("Unexpected object in the store: "+h.getEvent());
1528                                }
1529                        }
1530                        
1531                        if (!expected.isEmpty()) {
1532                                fail("Not present in the store: "+expected);
1533                        }
1534                        
1535                        wHandle.remove();
1536                        bus.join();
1537                        
1538                        for (Object o: bus.getStore()) {
1539                                fail("Unexpected object in the store: "+o);
1540                        }                        
1541                }
1542        }
1543                
1544        /**
1545         * - Post event
1546         * - Validate that remove handler not fired
1547         * - Remove event
1548         * - Validate event handler fired
1549         * @throws Exception
1550         */
1551        @Test
1552        public void testRemoveHandlers() throws Exception {
1553                for (ObjectBusFactory obf: createObjectBus(null)) {
1554                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();                        
1555                        
1556                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1557                        
1558                        PostingHandler ph = new PostingHandler();
1559                        introspector.bind(ph, bus);
1560                        
1561                        RemoveHandler rh = new RemoveHandler();
1562                        introspector.bind(rh, bus);                        
1563                        if (!obf.useSimpleMatcher) {
1564                                SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandlers_"+obf.id+".snapshot"));
1565                                bus.takeSnapshot(emfSnapshot);
1566                                
1567                                takeSnapshot(bus, "snapshots\\RemoveHandlers_"+obf.id+".dot");
1568                        }                        
1569                                                
1570                        Handle<Object, Integer, Object, Long> wHandle = bus.post("World");
1571                        bus.join();
1572                        
1573                        assertEquals(1, ph.getWorldCounter());
1574                        assertTrue(ph.isWorldOk());
1575                        
1576                        assertEquals(1, ph.getEmCounter());
1577                        assertTrue(ph.isEmOk());
1578                        
1579                        assertEquals(0, rh.getWorldCounter());                        
1580                        assertEquals(0, rh.getEmCounter());
1581                        
1582                        Set<String> expected = new HashSet<String>();
1583                        expected.add("Hello");
1584                        expected.add("World");
1585                        expected.add("!");
1586                                                
1587                        for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1588                                if (!expected.remove(h.getEvent())) {
1589                                        fail("Unexpected object in the store: "+h.getEvent());
1590                                }
1591                        }
1592                        
1593                        if (!expected.isEmpty()) {
1594                                fail("Not present in the store: "+expected);
1595                        }
1596                        
1597                        wHandle.remove();
1598                        bus.join();
1599                        
1600                        int counter=0;
1601                        for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1602                                ++counter;
1603                                if (!"Bye!".equals(h.getEvent())) {
1604                                                fail("Unexpected object in the store: "+h.getEvent());
1605                                }
1606                        }                                
1607                        assertEquals(1, ph.getWorldCounter());
1608                        assertTrue(ph.isWorldOk());
1609                        
1610                        assertEquals(1, ph.getEmCounter());
1611                        assertTrue(ph.isEmOk());
1612                        
1613                        assertEquals(1, rh.getWorldCounter());                        
1614                        assertTrue(rh.isWorldOk());
1615                        
1616                        assertEquals(1, rh.getEmCounter());
1617                        assertTrue(rh.isEmOk());
1618                        
1619                        assertEquals(String.valueOf(obf), 1, counter);                
1620                }
1621        }
1622        
1623        /**
1624         * - Post 1st event, validate not fired
1625         * - Post 2nd event, validate not fired
1626         * - Remove 2nd event, validate fired
1627         * - Remove 1st event, validate not fired 
1628         * @throws Exception
1629         */
1630        @Test
1631        public void testFireJoinRemoveHandler() throws Exception {
1632                for (ObjectBusFactory obf: createObjectBus(null)) {
1633                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();                        
1634                        
1635                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1636                        
1637                        JoinRemoveHandler jrh = new JoinRemoveHandler();
1638                        introspector.bind(jrh, bus);
1639                        
1640                        if (!obf.useSimpleMatcher) {
1641                                SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\JoinRemoveHandler_"+obf.id+".snapshot"));
1642                                bus.takeSnapshot(emfSnapshot);
1643                                
1644                                takeSnapshot(bus, "snapshots\\JoinRemoveHandler_"+obf.id+".dot");
1645                        }                        
1646                                                
1647                        bus.post("World");
1648                        bus.join();
1649                        
1650                        assertEquals(0, jrh.getJoinCounter());
1651                        
1652                        bus.post("Hello");
1653                        bus.join();
1654                        
1655                        assertEquals(0, jrh.getJoinCounter());
1656                        
1657                        bus.remove("Hello");
1658                        bus.join();
1659                        
1660                        assertEquals(1, jrh.getJoinCounter());
1661                        assertTrue(jrh.isJoinOk());
1662                        
1663                        bus.remove("World");
1664                        bus.join();
1665                        
1666                        assertEquals(1, jrh.getJoinCounter());
1667                        assertTrue(jrh.isJoinOk());                        
1668                }
1669        }
1670        
1671        /**
1672         * - Post event which matcher remove handler 1
1673         * - Update event with value which matches remove handler 2
1674         * - Verify that remove handler 1 is fired
1675         * - Remove event
1676         * - Verify that remove handler 2 is fired
1677         * @throws Exception
1678         */
1679        @Test
1680        public void testUpdateFireRemoveHandlers() throws Exception {
1681                for (ObjectBusFactory obf: createObjectBus(null)) {
1682                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1683                        if (obf.pkStore) {
1684                                continue; // No PK stores - events are mutable.
1685                        }
1686                        
1687                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1688                        
1689                        RemoveHandler2 rh = new RemoveHandler2();
1690                        introspector.bind(rh, bus);
1691                        
1692                        HelperHandler hh = new HelperHandler();
1693                        introspector.bind(hh, bus);
1694                        
1695                        if (!obf.useSimpleMatcher) {
1696                                SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandler2_"+obf.id+".snapshot"));
1697                                bus.takeSnapshot(emfSnapshot);
1698                                
1699                                takeSnapshot(bus, "snapshots\\RemoveHandler2_"+obf.id+".dot");
1700                        }                        
1701                        
1702                        AtomicReference<String> aRef = new AtomicReference<String>("World");
1703                        
1704                        rh.setExpectedWorld(aRef);
1705                                                
1706                        Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1707                        bus.join();
1708                        
1709                        assertEquals(0, rh.getWorldCounter());
1710                        assertEquals(0, rh.getEmCounter());
1711                        
1712                        assertEquals(1, hh.getWorldCounter());
1713                        assertTrue(hh.isWorldOk());
1714                        
1715                        assertEquals(0, hh.getEmCounter());
1716                        
1717                        aRef.set("!");
1718                        bus.join();
1719                        assertEquals(0, rh.getWorldCounter());
1720                        assertEquals(0, rh.getEmCounter());
1721                        
1722                        assertEquals(1, hh.getWorldCounter());
1723                        assertTrue(hh.isWorldOk());
1724                        
1725                        assertEquals(0, hh.getEmCounter());
1726                        
1727                        h.update();
1728                        bus.join();
1729                        assertEquals(1, rh.getWorldCounter());
1730                        assertTrue(rh.isWorldOk());                        
1731                        assertEquals(0, rh.getEmCounter());
1732                        
1733                        assertEquals(1, hh.getWorldCounter());
1734                        assertTrue(hh.isWorldOk());
1735                        
1736                        assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1737                        assertTrue(hh.isEmOk());
1738                        
1739                        h.remove();
1740                        bus.join();
1741                        assertEquals(1, rh.getWorldCounter());
1742                        assertTrue(rh.isWorldOk());                        
1743                        assertEquals(1, rh.getEmCounter());
1744                        assertTrue(rh.isEmOk());
1745                        
1746                        assertEquals(1, hh.getWorldCounter());
1747                        assertTrue(hh.isWorldOk());
1748                        
1749                        assertEquals(1, hh.getEmCounter());
1750                        assertTrue(hh.isEmOk());
1751                        
1752                }
1753        }
1754        
1755        /**
1756         * Update event through dispatch context. Verify that remove
1757         * handlers for old value are fired and that post handlers for 
1758         * new value are fired. Use join handler or AFTER_EVENT policy.
1759         * @throws Exception
1760         */
1761        @Test
1762        public void testUpdateFromHandler() throws Exception {
1763                for (ObjectBusFactory obf: createObjectBus(null)) {
1764                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1765                        if (obf.pkStore) {
1766                                continue; // No PK stores - events are mutable.
1767                        }
1768                        
1769                        if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
1770                                continue;
1771                        }
1772                        
1773                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1774                        
1775                        RemoveHandler2 rh = new RemoveHandler2();
1776                        introspector.bind(rh, bus);
1777                        
1778                        HelperHandler2 hh = new HelperHandler2();
1779                        introspector.bind(hh, bus);
1780                        
1781                        if (!obf.useSimpleMatcher) {
1782                                SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandler2_"+obf.id+".snapshot"));
1783                                bus.takeSnapshot(emfSnapshot);
1784                                
1785                                takeSnapshot(bus, "snapshots\\RemoveHandler2_"+obf.id+".dot");
1786                        }                        
1787                        
1788                        AtomicReference<String> aRef = new AtomicReference<String>("World");
1789                        
1790                        rh.setExpectedWorld(aRef);
1791                                                
1792                        Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1793                        bus.join();
1794                        
1795                        assertEquals(0, rh.getWorldCounter());
1796                        assertEquals(0, rh.getEmCounter());
1797                        
1798                        assertEquals(1, hh.getWorldCounter());
1799                        assertTrue(hh.isWorldOk());
1800                        
1801                        assertEquals(0, hh.getEmCounter());
1802                                                
1803                        bus.post("Hello"); // Hello fires join handler with World, which updates World to !
1804                        bus.join();
1805                        assertEquals(1, hh.getJoinCounter());
1806                        assertTrue(hh.isJoinOk());
1807                        
1808                        assertEquals(1, rh.getWorldCounter());
1809                        assertTrue(rh.isWorldOk());                        
1810                        assertEquals(0, rh.getEmCounter());
1811                        
1812                        assertEquals(1, hh.getWorldCounter());
1813                        assertTrue(hh.isWorldOk());
1814                        
1815                        assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1816                        assertTrue(hh.isEmOk());
1817                        
1818                        h.remove();
1819                        bus.join();
1820                        assertEquals(1, rh.getWorldCounter());
1821                        assertTrue(rh.isWorldOk());                        
1822                        assertEquals(1, rh.getEmCounter());
1823                        assertTrue(rh.isEmOk());
1824                        
1825                        assertEquals(1, hh.getWorldCounter());
1826                        assertTrue(hh.isWorldOk());
1827                        
1828                        assertEquals(1, hh.getEmCounter());
1829                        assertTrue(hh.isEmOk());
1830                        
1831                }
1832        }
1833        
1834        public static class ObservableStringReference implements Observable<ObservableStringReference> {
1835                
1836                public ObservableStringReference(String value) {
1837                        super();
1838                        this.value = value;
1839                }
1840 
1841                private Collection<Observer<? super ObservableStringReference>> observers = new ArrayList<Observer<? super ObservableStringReference>>();
1842 
1843                @Override
1844                public synchronized void addObserver(Observer<? super ObservableStringReference> o) {                                        
1845                        observers.add(o);
1846                }
1847 
1848                @Override
1849                public synchronized void deleteObserver(Observer<? super ObservableStringReference> o) {
1850                        observers.remove(o);
1851                }
1852                
1853                public Collection<Observer<? super ObservableStringReference>> getObservers() {
1854                        return observers;
1855                }
1856                
1857                private volatile String value;
1858                
1859                public synchronized void set(String value) {
1860                        this.value = value;
1861                        for (Observer<? super ObservableStringReference> o: observers) {
1862                                o.update(this);
1863                        }
1864                }
1865                
1866                public String get() {
1867                        return value;
1868                }
1869 
1870                
1871        };
1872        
1873        /**
1874         * - Post event which matches handler 1, verify fired
1875         * - Update event to match handler 2, verify that handler 2 is fired
1876         * @throws Exception
1877         */
1878        @Test
1879        public void testUpdateThroughObservable() throws Exception {
1880                ObservableConverter<Object> observableConverter = new ObservableConverter<Object>() {
1881 
1882                        @Override
1883                        public Observable<Object> convert(Object obj) {
1884                                return obj instanceof Observable ? (Observable<Object>) obj : null;
1885                        }
1886                        
1887                }; 
1888                for (ObjectBusFactory obf: createObjectBus(null)) {
1889                        obf.setObservableConverter(observableConverter);
1890                        LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1891                        if (obf.pkStore) {
1892                                continue; // No PK stores - events are mutable.
1893                        }
1894                        
1895                        LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1896                        
1897                        RemoveHandler2o rh = new RemoveHandler2o();
1898                        introspector.bind(rh, bus);
1899                        
1900                        HelperHandler_o hh = new HelperHandler_o();
1901                        introspector.bind(hh, bus);
1902                        
1903                        if (!obf.useSimpleMatcher) {
1904                                SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\UpdateThroughObservable_"+obf.id+".snapshot"));
1905                                bus.takeSnapshot(emfSnapshot);
1906                                
1907                                takeSnapshot(bus, "snapshots\\UpdateThroughObservable_"+obf.id+".dot");
1908                        }                        
1909                        
1910                        ObservableStringReference aRef = new ObservableStringReference("World");
1911                        
1912                        rh.setExpectedWorld(aRef);
1913                        
1914                        assertTrue(aRef.getObservers().isEmpty());
1915                                                
1916                        Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1917                        bus.join();
1918                        
1919                        assertEquals(1, aRef.getObservers().size());
1920                        
1921                        assertEquals(0, rh.getWorldCounter());
1922                        assertEquals(0, rh.getEmCounter());
1923                        
1924                        assertEquals(1, hh.getWorldCounter());
1925                        assertTrue(hh.isWorldOk());
1926                        
1927                        assertEquals(0, hh.getEmCounter());
1928                        
1929                        aRef.set("!");
1930                        bus.join();
1931                        assertEquals(1, rh.getWorldCounter());
1932                        assertTrue(rh.isWorldOk());                        
1933                        assertEquals(0, rh.getEmCounter());
1934                        
1935                        assertEquals(1, hh.getWorldCounter());
1936                        assertTrue(hh.isWorldOk());
1937                        
1938                        assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1939                        assertTrue(hh.isEmOk());
1940                        
1941                        h.remove();
1942                        bus.join();
1943                        assertEquals(1, rh.getWorldCounter());
1944                        assertTrue(rh.isWorldOk());                        
1945                        assertEquals(1, rh.getEmCounter());
1946                        assertTrue(rh.isEmOk());
1947                        
1948                        assertEquals(1, hh.getWorldCounter());
1949                        assertTrue(hh.isWorldOk());
1950                        
1951                        assertEquals(1, hh.getEmCounter());
1952                        assertTrue(hh.isEmOk());
1953                        
1954                        assertTrue(aRef.getObservers().isEmpty());
1955                        
1956                }
1957        }
1958                                        
1959}

[all classes][com.hammurapi.eventbus.tests]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov