001    /**
002     * 
003     */
004    package com.hammurapi.eventbus.tests;
005    
006    import static junit.framework.Assert.assertEquals;
007    import static junit.framework.Assert.assertFalse;
008    import static junit.framework.Assert.assertNotNull;
009    import static junit.framework.Assert.assertNotSame;
010    import static junit.framework.Assert.assertSame;
011    import static junit.framework.Assert.assertTrue;
012    import static junit.framework.Assert.fail;
013    
014    import java.io.File;
015    import java.lang.management.ManagementFactory;
016    import java.util.ArrayList;
017    import java.util.Collection;
018    import java.util.Collections;
019    import java.util.HashMap;
020    import java.util.HashSet;
021    import java.util.Map;
022    import java.util.Set;
023    import java.util.concurrent.ExecutorService;
024    import java.util.concurrent.Executors;
025    import java.util.concurrent.atomic.AtomicInteger;
026    import java.util.concurrent.atomic.AtomicReference;
027    
028    import org.junit.After;
029    import org.junit.Before;
030    import org.junit.Test;
031    
032    import com.hammurapi.common.MapTokenSource;
033    import com.hammurapi.common.Observable;
034    import com.hammurapi.common.ObservableConverter;
035    import com.hammurapi.common.Observer;
036    import com.hammurapi.common.TokenExpander;
037    import com.hammurapi.common.TokenExpander.TokenSource;
038    import com.hammurapi.eventbus.AbstractEventBus.Handle;
039    import com.hammurapi.eventbus.Derivation;
040    import com.hammurapi.eventbus.DuplicatesFilter;
041    import com.hammurapi.eventbus.EventDispatchContext;
042    import com.hammurapi.eventbus.EventDispatchException;
043    import com.hammurapi.eventbus.EventHandlerBase.Mode;
044    import com.hammurapi.eventbus.InferenceChainLengthFilter;
045    import com.hammurapi.eventbus.InferenceFilter;
046    import com.hammurapi.eventbus.InferencePolicy;
047    import com.hammurapi.eventbus.JavaBinderCompiler;
048    import com.hammurapi.eventbus.local.LocalAbstractEventHandler;
049    import com.hammurapi.eventbus.local.LocalDispatchNetworkDotSnapshot;
050    import com.hammurapi.eventbus.local.LocalEventBus;
051    import com.hammurapi.eventbus.local.LocalEventStore;
052    import com.hammurapi.eventbus.local.LocalEventStoreImpl;
053    import com.hammurapi.eventbus.local.LocalIntrospector;
054    import com.hammurapi.eventbus.local.LocalSimpleMatcher;
055    import com.hammurapi.eventbus.monitoring.JmxStatsCollector;
056    import com.hammurapi.eventbus.snapshot.io.SnapshotOutput;
057    import com.hammurapi.eventbus.tests.familyties.FamilyTiesDispatchNetworkDotSnapshot;
058    import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventBus;
059    import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventStore;
060    import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventStoreImpl;
061    import com.hammurapi.eventbus.tests.familyties.FamilyTiesIntrospector;
062    import com.hammurapi.eventbus.tests.familyties.model.Child;
063    import com.hammurapi.eventbus.tests.familyties.model.Person;
064    import com.hammurapi.eventbus.tests.familyties.model.Relative;
065    import com.hammurapi.eventbus.tests.familyties.model.Spouse;
066    import com.hammurapi.eventbus.tests.familyties.rules.DaughterRule;
067    import com.hammurapi.eventbus.tests.familyties.rules.DaughterRuleJavaBinder;
068    import com.hammurapi.eventbus.tests.familyties.rules.FamilyTiesRules;
069    import com.hammurapi.eventbus.tests.familyties.rules.GrandRules;
070    import com.hammurapi.eventbus.tests.familyties.rules.GrandRulesJavaBinder;
071    import com.hammurapi.eventbus.tests.familyties.rules.ParentChildRules;
072    import com.hammurapi.eventbus.tests.familyties.rules.ParentChildRulesJavaBinder;
073    import com.hammurapi.eventbus.tests.familyties.rules.ParentRules;
074    import com.hammurapi.eventbus.tests.familyties.rules.ParentRulesJavaBinder;
075    import com.hammurapi.eventbus.tests.familyties.rules.SecondaryRules;
076    import com.hammurapi.eventbus.tests.familyties.rules.SecondaryRulesJavaBinder;
077    import com.hammurapi.eventbus.tests.familyties.rules.SiblingRules;
078    import com.hammurapi.eventbus.tests.familyties.rules.SiblingRulesJavaBinder;
079    import com.hammurapi.eventbus.tests.familyties.rules.SonRule;
080    import com.hammurapi.eventbus.tests.familyties.rules.SonRuleJavaBinder;
081    import com.hammurapi.eventbus.tests.familyties.rules.SpouseRules;
082    import com.hammurapi.eventbus.tests.familyties.rules.SpouseRulesJavaBinder;
083    import com.hammurapi.eventbus.tests.fastfood.Cashier;
084    import com.hammurapi.eventbus.tests.fastfood.Dish;
085    import com.hammurapi.eventbus.tests.fastfood.Kitchen;
086    import com.hammurapi.eventbus.tests.fastfood.Order;
087    import com.hammurapi.eventbus.tests.fastfood.OrderFulfiller;
088    import com.hammurapi.extract.AbstractPredicate;
089    import com.hammurapi.extract.Extractor;
090    import com.hammurapi.extract.InstanceOfPredicate;
091    import com.hammurapi.extract.Predicate;
092    
093    /**
094     * @author Pavel Vlasov
095     *
096     */
097    public class LocalEventBusTests {
098            
099            private ExecutorService executorService;
100    
101            /**
102             * @throws java.lang.Exception
103             */
104            @Before
105            public void setUp() throws Exception {
106    //              BlockingQueue<Runnable> wq = new ArrayBlockingQueue<Runnable>(1000);
107                    executorService = Executors.newFixedThreadPool(2);
108            }
109    
110            /**
111             * @throws java.lang.Exception
112             */
113            @After
114            public void tearDown() throws Exception {
115                    executorService.shutdown();
116            }       
117            
118            @Test
119            public void testParallelExecution() throws InterruptedException {
120                    for (ObjectBusFactory obf: createObjectBus(null)) {
121                            if (!obf.isUseExecutor() || obf.inferencePolicy!=InferencePolicy.IMMEDIATELY) {
122                                    continue;
123                            }
124                            
125                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
126                            
127                            final Thread[] invocationThreads = new Thread[3];
128                            invocationThreads[0] = Thread.currentThread();
129                            
130                            final Object[] handled = {null};
131                            /**
132                             * Predicate is used to record its invocation thread.
133                             */
134                            Predicate<Object,Object> predicate = new AbstractPredicate<Object, Object>(0, null, false, 0) {
135            
136                                    @Override
137                                    protected Boolean extractInternal(
138                                                    Object context,
139                                                    Map<Object, Map<Extractor<Object, ? super Boolean, Object>, ? super Boolean>> cache,
140                                                    Object... obj) 
141                                    {                               
142                                            invocationThreads[1] = Thread.currentThread();
143                                            return "Test event".equals(obj[0]);
144                                    }
145                                    
146                            };
147            
148                            /**
149                             * Handler which does nothing, just records its invocation thread.
150                             */
151                            LocalAbstractEventHandler<Object, Integer, Object> handler = new LocalAbstractEventHandler<Object, Integer, Object>() {
152            
153                                    @Override
154                                    public void post(
155                                                    EventDispatchContext<Object, Integer, Object, Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> context,
156                                                    Object... events) {
157                                            invocationThreads[2] = Thread.currentThread();
158                                            handled[0] = events[0];
159                                    }
160            
161                            };
162                            
163                            handler.setPredicate(predicate);
164            
165                            bus.addHandler(handler);
166                            
167                            if (!obf.isUseSimpleMatcher()) {
168                                    takeSnapshot(bus, "snapshots\\snapshot1_"+obf.id+".dot");
169                            }
170                            
171                            final String event = "Test event";
172                            bus.post(event);
173                            
174                            // Without delay tasks will be most likely executed in the main thread.
175                            Thread.sleep(100);
176                            
177                            bus.join();
178                            
179                            assertNotNull(invocationThreads[1]);
180                            assertNotNull(invocationThreads[2]);
181    //                      assertNotSame(invocationThreads[0], invocationThreads[1]); // Can't guarantee that it'll be executed by another thread.
182                            assertNotSame(invocationThreads[0], invocationThreads[2]);
183                            assertSame(event, handled[0]);
184                    }
185            }
186    
187            private void takeSnapshot(LocalEventBus<Object, Integer, Object> bus, String fileName) {
188                    bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File(fileName)));
189            }
190            
191            @Test
192            public void testSimpleStringHandler() throws InterruptedException {
193                    for (StringBusFactory sbf: createStringBus(null)) {
194                            LocalEventBus<String, Integer, Object> bus = sbf.createBus();
195                    
196                            LocalIntrospector<String, Object> introspector = new LocalIntrospector<String, Object>(null, null);         
197                            SimpleStringHandler handler = new SimpleStringHandler();
198                            introspector.bind(handler, bus);
199                            
200                            if (!sbf.isUseSimpleMatcher()) {
201                                    bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<String, Integer, Object>(new File("snapshots\\SimpleStringHandler_"+sbf.id+".dot")));
202                            }
203                            
204                            bus.post("Hello world!");
205                            bus.join();
206                            
207                            assertEquals(1, handler.getCounter());
208                    }
209            }
210            
211            @Test
212            public void testSimpleStringHandler2() throws InterruptedException {
213                    for (ObjectBusFactory obf: createObjectBus(null)) {
214                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
215                    
216                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
217                            SimpleStringHandler handler = new SimpleStringHandler();
218                            introspector.bind(handler, bus);
219                            
220                            if (!obf.isUseSimpleMatcher()) {
221                                    takeSnapshot(bus, "snapshots\\SimpleStringHandler2_"+obf.id+".dot");
222                            }
223                            
224                            bus.post("Hello world!");
225                            bus.join();
226                            
227                            assertEquals(1, handler.getCounter());
228                    }
229            }
230            
231            @Test
232            public void testStringHandlerWithMethodCondition() throws InterruptedException {
233                    for (ObjectBusFactory obf: createObjectBus(null)) {
234                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
235                    
236                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
237                            StringHandlerWithMethodCondition handler = new StringHandlerWithMethodCondition();
238                            introspector.bind(handler, bus);
239                            
240                            if (!obf.isUseSimpleMatcher()) {
241                                    takeSnapshot(bus, "snapshots\\StringHandlerWithMethodCondition_"+obf.id+".dot");
242                            }
243                            
244                            bus.post("Hello");
245                            bus.post("World");
246                            bus.join();
247                            
248                            assertEquals(1, handler.getCounter());
249                            assertTrue(handler.isOk());
250                    }
251            }
252            
253            @Test
254            public void testStringHandlerWithParameterCondition() throws InterruptedException {
255                    for (ObjectBusFactory obf: createObjectBus(null)) {
256                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
257                    
258                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
259                            StringHandlerWithParameterCondition handler = new StringHandlerWithParameterCondition();
260                            introspector.bind(handler, bus);
261                            
262                            if (!obf.isUseSimpleMatcher()) {
263                                    takeSnapshot(bus, "snapshots\\StringHandlerWithParameterCondition_"+obf.id+".dot");
264                            }
265                            
266                            bus.post("Hello");
267                            bus.post("World");
268                            bus.join();
269                            
270                            assertEquals(1, handler.getCounter());
271                            assertTrue(handler.isOk());
272                    }
273            }
274            
275            @Test
276            public void testParameterizedStringHandlerWithMethodCondition() throws InterruptedException {
277                    for (ObjectBusFactory obf: createObjectBus(null)) {
278                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
279                    
280                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
281                            ParameterizedStringHandlerWithMethodCondition handler = new ParameterizedStringHandlerWithMethodCondition();
282                            handler.setFilterStr("Hello");
283                            introspector.bind(handler, bus);
284                            
285                            if (!obf.isUseSimpleMatcher()) {
286                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ParameterizedStringHandlerWithMethodCondition_"+obf.id+".dot"));
287                                    snapshot.setGraphAttribute("dpi", "70");
288                                    bus.takeSnapshot(snapshot);
289                            }
290                            
291                            bus.post("Hello");
292                            bus.post("World");
293                            bus.join();
294                            
295                            assertEquals(1, handler.getCounter());
296                            assertTrue(handler.isOk());
297                    }
298            }
299            
300            
301            @Test
302            public void testTokenParameterizedStringHandlerWithMethodCondition() throws InterruptedException {
303                    for (ObjectBusFactory obf: createObjectBus(null)) {
304                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
305                    
306                            Map<String, String> map = new HashMap<String, String>();
307                            map.put("filterStr", "\"Hello\"");
308                            TokenSource ts = new MapTokenSource(map); 
309                            TokenExpander te = new TokenExpander(ts);
310            
311                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, te);           
312                            TokenParameterizedStringHandlerWithMethodCondition handler = new TokenParameterizedStringHandlerWithMethodCondition();
313                                            
314                            introspector.bind(handler, bus);
315                            
316                            if (!obf.isUseSimpleMatcher()) {
317                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\TokenParameterizedStringHandlerWithMethodCondition_"+obf.id+".dot"));
318                                    snapshot.setGraphAttribute("dpi", "70");
319                                    bus.takeSnapshot(snapshot);
320                            }
321                            
322                            bus.post("Hello");
323                            bus.post("World");
324                            bus.join();
325                            
326                            assertEquals(1, handler.getCounter());
327                            assertTrue(handler.isOk());
328                    }
329            }
330            
331            @Test
332            public void testPost() throws InterruptedException {
333                    for (ObjectBusFactory obf: createObjectBus(null)) {
334                            
335                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
336                    
337                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
338                            StringHandlerWithMethodCondition helloHandler = new StringHandlerWithMethodCondition();         
339                            introspector.bind(helloHandler, bus);
340                            
341                            PostingHandler ph = new PostingHandler();
342                            introspector.bind(ph, bus);
343                            
344                            if (!obf.isUseSimpleMatcher()) {
345                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Posting_"+obf.id+".dot"));
346                                    snapshot.setGraphAttribute("dpi", "70");
347                                    bus.takeSnapshot(snapshot);
348                            }
349                            
350                            bus.post("World");
351                            bus.join();
352                            
353                            assertEquals(1, ph.getWorldCounter());
354                            assertTrue(ph.isWorldOk());
355                            
356                            assertEquals("OBF: "+obf, 1, ph.getEmCounter());
357                            assertTrue(ph.isEmOk());
358                            
359                            assertEquals(obf+",  ", 1, helloHandler.getCounter());
360                            assertTrue(helloHandler.isOk());
361                    }
362    
363            }
364            
365            @Test
366            public void testHandleJoin() throws InterruptedException {
367                    for (ObjectBusFactory obf: createObjectBus(null)) {
368                            if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
369                                    continue;
370                            }
371                            
372                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
373                    
374                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
375                            SlowPostingHandler ph = new SlowPostingHandler();
376                            introspector.bind(ph, bus);
377                            
378                            if (!obf.isUseSimpleMatcher()) {
379                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\HandleJoin_"+obf.id+".dot"));
380                                    snapshot.setGraphAttribute("dpi", "70");
381                                    bus.takeSnapshot(snapshot);
382                            }
383                            
384                            Handle<Object, Integer, Object, Long> worldHandle = bus.post("World");
385                            worldHandle.join();
386                            
387                            assertEquals(2, ph.getCounter());
388                            
389                            Set<String> words = new HashSet<String>();
390                            words.add("Hello");
391                            words.add("World");
392                            words.add("!");
393                            for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
394                                    assertTrue(words.remove(h.getEvent()));
395                            }
396                            assertTrue(words.isEmpty());
397                    }
398            }
399            
400            @Test
401            public void testJoin() throws InterruptedException {
402                    for (ObjectBusFactory obf: createObjectBus(null)) {
403                            
404                            if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
405                                    continue;
406                            }
407                            
408                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
409                    
410                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
411                            JoinHandler joinHandler = new JoinHandler();            
412                            introspector.bind(joinHandler, bus);
413                            
414                            if (!obf.isUseSimpleMatcher()) {
415                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Joining_"+obf.id+".dot"));
416                                    snapshot.setGraphAttribute("dpi", "70");
417                                    bus.takeSnapshot(snapshot);
418                            }
419                            
420                            bus.post("Hello");
421                            bus.post(20);
422                            bus.post(1000);
423                            bus.post("Good bye!");
424                            bus.join();
425                            
426                            // Browser snapshot
427                            if (!obf.useSimpleMatcher) {
428                                    SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\Joining_"+obf.id+".snapshot"));
429                                    bus.takeSnapshot(emfSnapshot);
430                            }
431                            
432                            assertEquals(1, joinHandler.getHelloCounter());
433                            assertEquals(1, joinHandler.getJoinCounter());
434                            
435                            assertTrue(joinHandler.isHelloOk());
436                            assertTrue(joinHandler.isJoinOk());
437                            
438                            // Derivation test
439                            for (Handle<Object,Integer,Object,Long> h: bus.getStore()) {
440                                    Object event = h.getEvent();
441                                    if (String.valueOf(event).startsWith("Hello World")) {
442                                            Collection<Derivation<Object, Integer, Object>> derivations = bus.getDerivations(event);
443                                            System.out.println(derivations);
444                                    }
445                            }
446                    }
447                    
448            }
449            
450            @SuppressWarnings("unchecked")
451            @Test
452            public void testBinderCompiler() {
453                    JavaBinderCompiler compiler = new JavaBinderCompiler(new File("generated.tests"));
454                    
455                    compiler.compileJavaBinder(JoinHandler.class, LocalEventBus.class, null, null);         
456            }
457            
458            @Test
459            public void testFamilyTiesBindingCompilation() {
460                    JavaBinderCompiler compiler = new JavaBinderCompiler(new File("generated.tests"));
461                    
462                    compiler.compileJavaBinder(DaughterRule.class, FamilyTiesEventBus.class, null, null);           
463                    compiler.compileJavaBinder(GrandRules.class, FamilyTiesEventBus.class, null, null);             
464                    compiler.compileJavaBinder(ParentChildRules.class, FamilyTiesEventBus.class, null, null);               
465                    compiler.compileJavaBinder(ParentRules.class, FamilyTiesEventBus.class, null, null);            
466                    compiler.compileJavaBinder(SecondaryRules.class, FamilyTiesEventBus.class, null, null);         
467                    compiler.compileJavaBinder(SiblingRules.class, FamilyTiesEventBus.class, null, null);           
468                    compiler.compileJavaBinder(SonRule.class, FamilyTiesEventBus.class, null, null);                
469                    compiler.compileJavaBinder(SpouseRules.class, FamilyTiesEventBus.class, null, null);            
470            }
471            
472            @Test
473            public void testCompiledJoin() throws InterruptedException {
474                    for (ObjectBusFactory obf: createObjectBus(null)) {
475                            if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
476                                    continue;
477                            }
478                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
479                    
480                            JoinHandler joinHandler = new JoinHandler();
481                            JoinHandlerJavaBinder joinHandlerJavaBinder = new JoinHandlerJavaBinder();
482                            joinHandlerJavaBinder.bind(joinHandler, bus);
483                            
484                            if (!obf.isUseSimpleMatcher()) {
485                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Joining_"+obf.id+".dot"));
486                                    snapshot.setGraphAttribute("dpi", "70");
487                                    bus.takeSnapshot(snapshot);
488                            }
489                            
490                            bus.post("Hello");
491                            bus.post(20);
492                            bus.post(1000);
493                            bus.post("Good bye!");
494                            bus.join();
495                            
496                            if (!obf.isUseSimpleMatcher()) {
497                                    // Browser snapshot
498                                    SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\CompiledJoining"+obf.id+".snapshot"));
499                                    bus.takeSnapshot(emfSnapshot);
500                            }
501                            
502                            assertEquals(1, joinHandler.getHelloCounter());
503                            assertEquals(1, joinHandler.getJoinCounter());
504                            
505                            assertTrue(joinHandler.isHelloOk());
506                            assertTrue(joinHandler.isJoinOk());
507                            
508                            // Derivation test
509                            for (Handle<Object,Integer,Object,Long> h: bus.getStore()) {
510                                    Object event = h.getEvent();
511                                    if (String.valueOf(event).startsWith("Hello World")) {
512                                            Collection<Derivation<Object, Integer, Object>> derivations = bus.getDerivations(event);
513                                            System.out.println(derivations);
514                                    }
515                            }
516                    }               
517            }
518            
519            @Test
520            public void testJoinWithCost() throws InterruptedException {
521                    for (ObjectBusFactory obf: createObjectBus(null)) {
522                            if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
523                                    continue;
524                            }
525                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
526                    
527                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
528                            JoinHandlerWithCost joinHandler = new JoinHandlerWithCost();            
529                            introspector.bind(joinHandler, bus);
530                            
531                            if (!obf.isUseSimpleMatcher()) {
532                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\JoiningWithCost_"+obf.id+".dot"));
533                                    snapshot.setGraphAttribute("dpi", "70");
534                                    bus.takeSnapshot(snapshot);
535                            }
536                            
537                            bus.post("Hello");
538                            bus.post(20);
539                            bus.post(1000);
540                            bus.post("World");
541                            bus.join();
542                            
543                            assertEquals(1, joinHandler.getJoinCounter());
544                            assertTrue(joinHandler.isJoinOk());
545                    }
546            }
547            
548            @Test
549            public void testParameterNamesJoin() throws InterruptedException {
550                    for (ObjectBusFactory obf: createObjectBus(null)) {
551                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
552                    
553                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
554                            ParameterNamesJoinHandler joinHandler = new ParameterNamesJoinHandler();                
555                            introspector.bind(joinHandler, bus);
556                            
557                            if (!obf.isUseSimpleMatcher()) {
558                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ParameterNamesJoining_"+obf.id+".dot"));
559                                    snapshot.setGraphAttribute("dpi", "70");
560                                    bus.takeSnapshot(snapshot);
561                            }
562                            
563                            bus.post("Hello");
564                            bus.post(20);
565                            bus.post(1000);
566                            bus.post("Good bye!");
567                            bus.join();
568                            
569                            assertEquals(1, joinHandler.getHelloCounter());
570                            assertEquals(1, joinHandler.getJoinCounter());
571                            
572                            assertTrue(joinHandler.isHelloOk());
573                            assertTrue(joinHandler.isJoinOk());
574                    }
575                    
576            }
577            
578            @Test
579            public void testPriority() throws InterruptedException {
580                    for (ObjectBusFactory obf: createObjectBus(null)) {
581                            if (obf.inferencePolicy.compareTo(InferencePolicy.AFTER_HANDLER)>=0) {
582                                    continue; // Priorities work only if inference commands are processed after handler or immediately.
583                            }
584                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
585                    
586                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
587                            PriorityHandler priorityHandler = new PriorityHandler();                
588                            introspector.bind(priorityHandler, bus);
589                            
590                            bus.post("World");
591                            bus.post("Hello");
592                            bus.join();
593                            
594                            assertEquals(obf+", ", 1, priorityHandler.getLpCounter());
595                            assertEquals(obf+", ", 2, priorityHandler.getHpCounter());
596                            assertEquals(obf+", ", 2, priorityHandler.getAhpCounter());
597                    }
598            }
599            
600            @Test
601            public void testOneOff() throws InterruptedException {
602                    for (ObjectBusFactory obf: createObjectBus(null)) {
603                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
604                    
605                            final AtomicInteger counter = new AtomicInteger();
606            
607                            LocalAbstractEventHandler<Object, Integer, Object> eh = new LocalAbstractEventHandler<Object, Integer, Object>(1, 0, null, false, true, Mode.POST) {
608            
609            
610                                    @Override
611                                    public void post(
612                                                    EventDispatchContext<Object, Integer, Object, Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> context,
613                                                    Object... events) {
614                                            
615                                            counter.incrementAndGet();                              
616                                    }
617                            };
618                            
619                            bus.addHandler(eh);
620                            
621                            bus.post("World");
622                            bus.post("Hello");
623                            bus.post("!");
624                            bus.join();
625                            
626                            assertEquals(1, counter.get());
627                    }
628            }
629            
630            @Test
631            public void testConsumeJoin() throws InterruptedException {
632                    for (ObjectBusFactory obf: createObjectBus(null)) {
633                            if (!obf.pkStore) {
634                                    continue;
635                            }
636                            
637                            if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
638                                    continue;
639                            }
640                            
641                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
642                    
643                            OrderFulfiller orderFulfiller = new OrderFulfiller();
644                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
645                            introspector.bind(orderFulfiller, bus);
646            
647                            if (!obf.isUseSimpleMatcher()) {
648                                    LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ConsumeJoin_"+obf.id+".dot"));
649                                    snapshot.setGraphAttribute("dpi", "70");
650                                    bus.takeSnapshot(snapshot);
651                            }
652                            
653                            int totalNumberOfDishes = 800;          
654                            Kitchen kitchen = new Kitchen(totalNumberOfDishes, bus);
655                            
656                            int totalNumberOfOrders = 500;
657                            Cashier cashier = new Cashier(totalNumberOfOrders, bus);
658                            
659                            kitchen.start();
660                            cashier.start();
661            
662                            kitchen.join();
663                            cashier.join();
664                            bus.join();
665                            Thread.sleep(5000);
666                            
667                            assertTrue(orderFulfiller.isOK());
668                            
669                            int fulfilledOrders = 0;
670                            for (Order o: orderFulfiller.getFulfilledOrders()) {
671                                    assertTrue(o.isFulfilled());
672                                    ++fulfilledOrders;
673                            }
674                            
675                            int partialOrderDishes = 0;
676                            int unfulfilledOrders = 0;
677                            Predicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> orderSelector = new InstanceOfPredicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>>(bus.getStore().getPrimaryKeyExtractor(), Order.class);
678                            for (Object event: bus.getStore().get(orderSelector, bus.getStore().getPrimaryKeyExtractor(), false, null)) {
679                                    Order order = (Order) event;
680                                    assertNotNull(order);
681                                    assertFalse(order.isFulfilled());
682                                    if (order.getMainDish()!=null) {
683                                            ++partialOrderDishes;
684                                    }
685                                    if (order.getSideDish()!=null) {
686                                            ++partialOrderDishes;
687                                    }
688                                    assertFalse(orderFulfiller.getFulfilledOrders().contains(order));
689                                    ++unfulfilledOrders;
690                            }
691                            assertEquals(totalNumberOfOrders, fulfilledOrders+unfulfilledOrders);
692                            
693                            int remainingDishes = 0;
694                            Predicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> dishSelector = new InstanceOfPredicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>>(bus.getStore().getPrimaryKeyExtractor(), Dish.class);
695                            for (Handle<Object, Integer, Object, Long> h: bus.getStore().get(dishSelector)) {
696                                    Dish dish = (Dish) h.getEvent();
697                                    assertNotNull(dish);
698                                    assertFalse(dish.isConsumed());
699                                    ++remainingDishes;
700                            }
701                            assertEquals(totalNumberOfDishes, fulfilledOrders*2+remainingDishes+partialOrderDishes);
702                    }               
703            }
704    
705            @Test
706            public void testPredicateChaining() throws InterruptedException {
707                    for (StringBusFactory obf: createStringBus(null)) {
708                            LocalEventBus<String, Integer, Object> bus = obf.createBus();
709                    
710                            LocalIntrospector<String, Object> introspector = new LocalIntrospector<String, Object>(null, null);         
711                            PredicateChainingHandler handler = new PredicateChainingHandler();
712                            introspector.bind(handler, bus);
713                            
714                            if (!obf.isUseSimpleMatcher()) {
715                                    bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<String, Integer, Object>(new File("snapshots\\PredicateChainingHandler_"+obf.id+".dot")));
716                            }
717                            
718                            bus.post("Hello world!");
719                            bus.join();
720                            
721                            assertTrue(handler.isOneInvoked());
722                            assertTrue(handler.isTwoInvoked());
723                    }
724            }
725            
726            @Test
727            public void testOppositeChaining() throws InterruptedException {
728                    for (ObjectBusFactory obf: createObjectBus(null)) {
729                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
730                    
731                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
732                            OppositeHandler handler = new OppositeHandler();
733                            introspector.bind(handler, bus);
734                            
735                            if (!obf.isUseSimpleMatcher()) {
736                                    bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\OppositeHandler_"+obf.id+".dot")));
737                            }
738                            
739                            bus.post("Hello world!");
740                            bus.join();
741                            
742                            assertFalse(handler.isOneInvoked());
743                            assertTrue(handler.isTwoInvoked());
744                    }
745            }
746            
747            @Test
748            public void testMoreRestrictive() throws InterruptedException {
749                    for (ObjectBusFactory obf: createObjectBus(null)) {
750                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
751                    
752                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
753                            MoreRestrictiveHandler handler = new MoreRestrictiveHandler();
754                            introspector.bind(handler, bus);
755                            
756                            if (!obf.isUseSimpleMatcher()) {
757                                    bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\MoreRestrictiveHandler_"+obf.id+".dot")));
758                            }
759                            
760                            bus.post(Collections.singleton("Hello"));
761                            bus.join();
762                            
763                            assertTrue(handler.isCollectionInvoked());
764                            assertFalse(handler.isListInvoked());
765                    }
766            }
767            
768            @Test
769            public void testLoop() throws InterruptedException {
770                    DuplicatesFilter iFilter = new DuplicatesFilter();
771                    for (ObjectBusFactory obf: createObjectBus(iFilter)) {
772                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
773                            
774                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
775                            LoopHandler handler = new LoopHandler();
776                            introspector.bind(handler, bus);
777                            
778                            if (!obf.isUseSimpleMatcher()) {
779                                    bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\LoopHandler_"+obf.id+".dot")));
780                            }
781                            
782                            bus.post("Hello");
783                            bus.post("Good bye!");
784                            bus.join();
785                            
786                            assertTrue(handler.isHelloOk());
787                            assertEquals(1, handler.getHelloCounter());
788                            
789                            assertTrue(handler.isWorldOk());
790                            assertEquals(1, handler.getWorldCounter());
791                    }
792            }
793            
794            @Test
795            public void testInifinite() throws InterruptedException {
796                    InferenceFilter iFilter = new InferenceChainLengthFilter(100);
797                    for (ObjectBusFactory obf: createObjectBus(iFilter)) {
798                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
799                    
800                            bus.setMaxDerivationDepth(100);
801                            
802                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
803                            InfiniteHandler handler = new InfiniteHandler();
804                            introspector.bind(handler, bus);
805                            
806                            if (!obf.isUseSimpleMatcher()) {
807                                    bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\InfiniteHandler_"+obf.id+".dot")));
808                            }
809                            
810                            Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
811                            handle.join();
812                    }
813            }
814            
815            @Test(expected=EventDispatchException.class)
816            public void testFaulty() throws InterruptedException {
817                    for (ObjectBusFactory obf: createObjectBus(null)) {
818                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
819                    
820                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
821                            FaultyHandler handler = new FaultyHandler();
822                            introspector.bind(handler, bus);
823                            
824                            Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
825                            handle.join();
826                    }
827            }
828    
829            @Test
830            public void testRemoveHandler() throws InterruptedException {
831                    for (ObjectBusFactory obf: createObjectBus(null)) {
832                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
833                    
834                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
835                            SimpleStringHandler handler = new SimpleStringHandler();
836                            Set<Long> keys = introspector.bind(handler, bus);
837                            
838                            Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
839                            handle.join();
840                            assertEquals(1, handler.getCounter());
841                            
842                            bus.removeHandlers(keys);
843                            
844                            SimpleStringHandler handler2 = new SimpleStringHandler();
845                            introspector.bind(handler2, bus);
846                            
847                            Handle<Object, Integer, Object, Long> handle2 = bus.post("Good bye!");
848                            handle2.join();
849                            assertEquals(1, handler.getCounter());
850                            assertEquals(1, handler2.getCounter());
851                    }               
852            }
853            
854            @Test
855            public void testReset() throws InterruptedException {
856                    for (ObjectBusFactory obf: createObjectBus(null)) {
857                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
858                    
859                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);         
860                            StatefulHandler handler = new StatefulHandler();
861                            Set<Long> keys = introspector.bind(handler, bus);
862                            
863                            bus.post("Hello");
864                            bus.post("1");
865                            bus.join();
866                            
867                            assertEquals(2, handler.getState().size());
868                            int kbSize = 0;
869                            for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
870                                    ++kbSize;
871                            }
872                            assertEquals(2, kbSize);
873                            assertEquals(0, handler.getResetCounter());
874                            
875                            bus.reset();
876                            
877                            assertEquals(0, handler.getState().size());
878                            kbSize = 0;
879                            for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
880                                    ++kbSize;
881                            }
882                            assertEquals(0, kbSize);
883                            assertEquals(2, handler.getResetCounter());
884                    }                               
885            }
886            
887            private static int predicateCounter;
888            
889            /**
890             * Helper interface for intantiate bus for tests.
891             * @author Pavel Vlasov
892             *
893             * @param <E>
894             * @param <P>
895             * @param <C>
896             */
897            private class ObjectBusFactory {
898                    
899                    private InferencePolicy inferencePolicy;
900                    private boolean pkStore;
901                    private boolean useExecutor;
902                    private InferenceFilter iFilter;
903                    private int id;
904                    private boolean useSimpleMatcher;
905                    private ObservableConverter<Object> observableConverter;
906    
907                    public ObjectBusFactory(InferencePolicy inferencePolicy, InferenceFilter iFilter, boolean pkStore, boolean useExecutor, int id, boolean useSimpleMatcher) {
908                            this.inferencePolicy = inferencePolicy;
909                            this.pkStore = pkStore;
910                            this.useExecutor = useExecutor;
911                            this.iFilter = iFilter;
912                            this.id = id;
913                            this.useSimpleMatcher = useSimpleMatcher;
914                    }
915                    
916                    public boolean isUseExecutor() {
917                            return useExecutor;
918                    }
919                    
920                    public boolean isUseSimpleMatcher() {
921                            return useSimpleMatcher;
922                    }
923                    
924                    public InferencePolicy getInferencePolicy() {
925                            return inferencePolicy;
926                    }
927                    
928                    @Override
929                    public String toString() {
930                            return "ObjectBusFactory [inferencePolicy=" + inferencePolicy
931                                            + ", pkStore=" + pkStore + ", useExecutor=" + useExecutor
932                                            + ", iFilter=" + iFilter + ", id=" + id + ", useSimpleMatcher="+useSimpleMatcher+"]";
933                    }
934                    
935                    public void setObservableConverter(ObservableConverter<Object> observableConverter) {
936                            this.observableConverter = observableConverter;
937                    }
938    
939                    LocalEventBus<Object, Integer, Object> createBus() {
940                            LocalEventStoreImpl.Config<Object, Integer, Object> storeConfig = new LocalEventStoreImpl.Config<Object, Integer, Object>();
941                            if (!pkStore) {
942                                    storeConfig.setPrimaryKeyExtractor(null);
943                            }
944                            if (useExecutor) {
945                                    storeConfig.setExecutorService(executorService);
946                            }
947                            LocalEventStore<Object, Integer, Object> eventStore = new LocalEventStoreImpl<Object, Integer, Object>(storeConfig);
948                            
949                            LocalEventBus.Config<Object, Integer, Object> busConfig = new LocalEventBus.Config<Object, Integer, Object>();
950                            busConfig.setEventType(Object.class);
951                            busConfig.setStore(eventStore);
952                            if (useExecutor) {
953                                    busConfig.setExecutorService(executorService);
954                            }
955                            busConfig.setInferencePolicy(inferencePolicy);
956                            busConfig.setInferenceFilter(iFilter);
957                            if (useSimpleMatcher) {
958                                    busConfig.setMatcher(new LocalSimpleMatcher<Object, Integer, Object, LocalEventStore<Object,Integer,Object>>());
959                            }
960                            
961                            busConfig.setObservableConverter(observableConverter);
962                            
963                            return new LocalEventBus<Object, Integer, Object>(busConfig) {
964                                    
965    //                              /**
966    //                               * For troubleshooting.
967    //                               */
968    //                              public Long addHandler(
969    //                                              EventHandler<Object,Integer,Object,AbstractEventBus.Handle<Object,Integer,Object,Long>,
970    //                                              LocalEventStore<Object,Integer,Object>> eventHandler, 
971    //                                              boolean oneOff, 
972    //                                              Predicate<Object,Object>... predicates) {
973            //
974    //                                      String timestamp = Long.toString(System.currentTimeMillis(), Character.MAX_RADIX);
975    //                                      for (int i=0; i<predicates.length; ++i) {
976    //                                              PredicateOutput.output(predicates[i], new File("snapshots\\Predicate-"+(++i)+"-"+timestamp+"-"+i+".snapshot"));                 
977    //                                      }
978    //                                      
979    //                                      return super.addHandler(eventHandler, oneOff, predicates);
980    //                              }
981    
982                            };
983                    }
984                    
985            }
986            /**
987             * Helper interface for instantiate bus for tests.
988             * @author Pavel Vlasov
989             *
990             * @param <E>
991             * @param <P>
992             * @param <C>
993             */
994            private class StringBusFactory {
995                    
996                    private InferencePolicy inferencePolicy;
997                    private boolean pkStore;
998                    private boolean useExecutor;
999                    private InferenceFilter iFilter;
1000                    private int id;
1001                    private boolean useSimpleMatcher;
1002    
1003                    public StringBusFactory(InferencePolicy inferencePolicy, InferenceFilter iFilter, boolean pkStore, boolean useExecutor, int id, boolean useSimpleMatcher) {
1004                            this.inferencePolicy = inferencePolicy;
1005                            this.pkStore = pkStore;
1006                            this.useExecutor = useExecutor;
1007                            this.iFilter = iFilter;
1008                            this.id = id;
1009                            this.useSimpleMatcher = useSimpleMatcher;
1010                    }
1011                    
1012                    public boolean isUseExecutor() {
1013                            return useExecutor;
1014                    }
1015                    
1016                    public boolean isUseSimpleMatcher() {
1017                            return useSimpleMatcher;
1018                    }
1019                    
1020                    LocalEventBus<String, Integer, Object> createBus() {
1021                            LocalEventStoreImpl.Config<String, Integer, Object> storeConfig = new LocalEventStoreImpl.Config<String, Integer, Object>();
1022                            if (!pkStore) {
1023                                    storeConfig.setPrimaryKeyExtractor(null);
1024                            }
1025                            if (useExecutor) {
1026                                    storeConfig.setExecutorService(executorService);
1027                            }
1028                            LocalEventStore<String, Integer, Object> eventStore = new LocalEventStoreImpl<String, Integer, Object>(storeConfig);
1029                            
1030                            LocalEventBus.Config<String, Integer, Object> busConfig = new LocalEventBus.Config<String, Integer, Object>();
1031                            busConfig.setEventType(String.class);
1032                            busConfig.setStore(eventStore);
1033                            if (useExecutor) {
1034                                    busConfig.setExecutorService(executorService);
1035                            }
1036                            busConfig.setInferencePolicy(inferencePolicy);
1037                            busConfig.setInferenceFilter(iFilter);
1038                            if (useSimpleMatcher) {
1039                                    busConfig.setMatcher(new LocalSimpleMatcher<String, Integer, Object, LocalEventStore<String,Integer,Object>>());
1040                            }
1041                            
1042                            return new LocalEventBus<String, Integer, Object>(busConfig) {
1043                                    
1044    //                              /**
1045    //                               * For troubleshooting.
1046    //                               */
1047    //                              public Long addHandler(
1048    //                                              EventHandler<Object,Integer,Object,AbstractEventBus.Handle<Object,Integer,Object,Long>,
1049    //                                              LocalEventStore<Object,Integer,Object>> eventHandler, 
1050    //                                              boolean oneOff, 
1051    //                                              Predicate<Object,Object>... predicates) {
1052            //
1053    //                                      String timestamp = Long.toString(System.currentTimeMillis(), Character.MAX_RADIX);
1054    //                                      for (int i=0; i<predicates.length; ++i) {
1055    //                                              PredicateOutput.output(predicates[i], new File("snapshots\\Predicate-"+(++i)+"-"+timestamp+"-"+i+".snapshot"));                 
1056    //                                      }
1057    //                                      
1058    //                                      return super.addHandler(eventHandler, oneOff, predicates);
1059    //                              }
1060    
1061                            };
1062                    }
1063                    
1064            }
1065    
1066            private Iterable<ObjectBusFactory> createObjectBus(InferenceFilter iFilter) {
1067                    int counter = 0;
1068                    ArrayList<ObjectBusFactory> ret = new ArrayList<ObjectBusFactory>();
1069                    for (InferencePolicy ip: InferencePolicy.values()) {
1070                            ret.add(new ObjectBusFactory(ip, iFilter, false, false, ++counter, true));
1071                            ret.add(new ObjectBusFactory(ip, iFilter, false, true, ++counter, true));
1072                            ret.add(new ObjectBusFactory(ip, iFilter, true, false, ++counter, true));
1073                            ret.add(new ObjectBusFactory(ip, iFilter, true, true, ++counter, true));
1074                            
1075                            ret.add(new ObjectBusFactory(ip, iFilter, false, false, ++counter, false));
1076                            ret.add(new ObjectBusFactory(ip, iFilter, false, true, ++counter, false));
1077                            ret.add(new ObjectBusFactory(ip, iFilter, true, false, ++counter, false));
1078                            ret.add(new ObjectBusFactory(ip, iFilter, true, true, ++counter, false));
1079                    }
1080                    return ret;
1081            }
1082    
1083            private Iterable<StringBusFactory> createStringBus(InferenceFilter iFilter) {
1084                    int counter = 0;
1085                    ArrayList<StringBusFactory> ret = new ArrayList<StringBusFactory>();
1086                    for (InferencePolicy ip: InferencePolicy.values()) {
1087                            ret.add(new StringBusFactory(ip, iFilter, false, false, ++counter, true));
1088                            ret.add(new StringBusFactory(ip, iFilter, false, true, ++counter, true));
1089                            ret.add(new StringBusFactory(ip, iFilter, true, false, ++counter, true));
1090                            ret.add(new StringBusFactory(ip, iFilter, true, true, ++counter, true));
1091                            
1092                            ret.add(new StringBusFactory(ip, iFilter, false, false, ++counter, false));
1093                            ret.add(new StringBusFactory(ip, iFilter, false, true, ++counter, false));
1094                            ret.add(new StringBusFactory(ip, iFilter, true, false, ++counter, false));
1095                            ret.add(new StringBusFactory(ip, iFilter, true, true, ++counter, false));
1096                    }
1097                    return ret;
1098            }
1099            
1100            @Test
1101            public void testFamilyTies() throws InterruptedException {
1102                    int counter = 0;
1103                    for (InferencePolicy ip: InferencePolicy.values()) {
1104                            if (ip==InferencePolicy.IMMEDIATELY) {
1105                                    continue;
1106                            }
1107                            
1108                            _testFamilyTies(ip, false, false, ++counter, true);
1109                            _testFamilyTies(ip, false, true, ++counter, true);
1110                            _testFamilyTies(ip, true, false, ++counter, true);
1111                            _testFamilyTies(ip, true, true, ++counter, true);
1112                            
1113                            _testFamilyTies(ip, false, false, ++counter, false);
1114                            _testFamilyTies(ip, false, true, ++counter, false);
1115                            _testFamilyTies(ip, true, false, ++counter, false);
1116                            _testFamilyTies(ip, true, true, ++counter, false);                      
1117                    }
1118            }
1119            
1120            private void _testFamilyTies(InferencePolicy iPolicy, boolean useExecutor, boolean pkStore, int id, boolean useSimpleMatcher) throws InterruptedException {             
1121                    // Create bus
1122                    FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1123                    if (useExecutor) {
1124                            storeConfig.setExecutorService(executorService);
1125                    }
1126                    if (!pkStore) {
1127                            storeConfig.setPrimaryKeyExtractor(null);
1128                    }
1129                    FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1130                    
1131                    FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1132                    busConfig.setStore(eventStore);
1133                    if (useExecutor) {
1134                            busConfig.setExecutorService(executorService);
1135                    }
1136                    if (useSimpleMatcher) {
1137                            busConfig.setMatcher(new LocalSimpleMatcher<Relative, Integer, FamilyTiesRules, FamilyTiesEventStore>());
1138                    }
1139                    
1140                    busConfig.setInferencePolicy(iPolicy);
1141                    busConfig.setInferenceFilter(new DuplicatesFilter());
1142                    
1143                    FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1144                    
1145                    FamilyTiesIntrospector introspector = new FamilyTiesIntrospector(null, null);
1146                    
1147                    // Bind rules.
1148                    introspector.bind(new DaughterRule(), bus);
1149                    introspector.bind(new GrandRules(), bus);
1150                    introspector.bind(new ParentChildRules(), bus);
1151                    introspector.bind(new ParentRules(), bus);
1152                    introspector.bind(new SecondaryRules(), bus);
1153                    introspector.bind(new SiblingRules(), bus);
1154                    introspector.bind(new SonRule(), bus);
1155                    introspector.bind(new SpouseRules(), bus);
1156                    
1157                    if (!useSimpleMatcher) {
1158                            // Take dot snapshot
1159                            bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\FamilyTies_"+id+".dot")));
1160                    }
1161    
1162                    // Post seed relationships
1163                    Person kate = new Person("Kate", 58, false);
1164                    Person victor = new Person("Victor", 63, true);
1165                    bus.post(new Spouse(kate, victor));
1166    
1167                    Person peter = new Person("Peter", 37, true);
1168                    bus.post(new Child(peter, kate));
1169                    bus.post(new Child(peter, victor));
1170                    
1171                    Person alison = new Person("Alison", 36, false);
1172                    bus.post(new Spouse(peter, alison));
1173    
1174                    Person lucy = new Person("Lucy", 17, false);
1175                    bus.post(new Child(lucy, alison));
1176                    
1177                    Person nancy = new Person("Nancy", 14, false);
1178                    bus.post(new Child(nancy, peter));
1179                    
1180                    Person dan = new Person("Dan", 7, true);
1181                    bus.post(new Child(dan, peter));
1182                    bus.post(new Child(dan, alison));
1183                    
1184                    Person audrey = new Person("Audrey", 4, false);
1185                    bus.post(new Child(audrey, peter));
1186                    bus.post(new Child(audrey, alison));            
1187                    
1188                    Person tanya = new Person("Tanya", 31, false);
1189                    Person max = new Person("Max", 32, true);
1190                    bus.post(new Spouse(tanya, max));
1191                    bus.post(new Child(tanya, kate));
1192                    bus.post(new Child(tanya, victor));
1193    
1194                    Person vilma = new Person("Vilma", 14, false);
1195                    bus.post(new Child(vilma, tanya));
1196                    
1197                    Person george = new Person("George", 10, true);
1198                    bus.post(new Child(george, tanya));
1199                    
1200                    Person lisa = new Person("Lisa", 5, false);
1201                    bus.post(new Child(lisa, tanya));
1202                    bus.post(new Child(lisa, max));         
1203                                    
1204                    bus.join();
1205                    
1206                    // Take browser snapshot
1207                    if (!useSimpleMatcher) {
1208                            SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\FamilyTies_"+id+".snapshot"));
1209                            bus.takeSnapshot(emfSnapshot);
1210                    }
1211                    
1212                    int numberOfLisaRelatives = 0;
1213                    for (Relative r: bus.getStore().getRelatives(lisa)) {
1214                            ++numberOfLisaRelatives;
1215                    }
1216                    assertEquals(11, numberOfLisaRelatives);
1217                    
1218                    for (Relative relative: bus.getStore().getRelatives(lisa)) {
1219                            System.out.println(relative);
1220                    }
1221            }
1222            
1223            @Test
1224            public void testCompiledFamilyTies() throws InterruptedException {
1225                    int counter = 0;
1226                    for (InferencePolicy ip: InferencePolicy.values()) {
1227                            if (ip==InferencePolicy.IMMEDIATELY) {
1228                                    continue;
1229                            }
1230                            _testCompiledFamilyTies(ip, false, false, ++counter, true);
1231                            _testCompiledFamilyTies(ip, false, true, ++counter, true);
1232                            _testCompiledFamilyTies(ip, true, false, ++counter, true);
1233                            _testCompiledFamilyTies(ip, true, true, ++counter, true);
1234    
1235                            _testCompiledFamilyTies(ip, false, false, ++counter, false);
1236                            _testCompiledFamilyTies(ip, false, true, ++counter, false);
1237                            _testCompiledFamilyTies(ip, true, false, ++counter, false);
1238                            _testCompiledFamilyTies(ip, true, true, ++counter, false);
1239                    }
1240            }
1241            
1242            private void _testCompiledFamilyTies(InferencePolicy iPolicy, boolean useExecutor, boolean pkStore, int id, boolean useSimpleMatcher) throws InterruptedException {             
1243                    // Create bus
1244                    // Create bus
1245                    FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1246                    if (useExecutor) {
1247                            storeConfig.setExecutorService(executorService);
1248                    }
1249                    if (!pkStore) {
1250                            storeConfig.setPrimaryKeyExtractor(null);
1251                    }
1252                    FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1253                    
1254                    FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1255                    busConfig.setStore(eventStore);
1256                    if (useExecutor) {
1257                            busConfig.setExecutorService(executorService);
1258                    }
1259                    busConfig.setInferencePolicy(iPolicy);
1260                    busConfig.setInferenceFilter(new DuplicatesFilter());
1261                    
1262                    if (useSimpleMatcher) {
1263                            busConfig.setMatcher(new LocalSimpleMatcher<Relative, Integer, FamilyTiesRules, FamilyTiesEventStore>());
1264                    }               
1265                    
1266                    FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1267                    
1268                    // Bind rules.
1269                    new DaughterRuleJavaBinder().bind(new DaughterRule(), bus);
1270                    new GrandRulesJavaBinder().bind(new GrandRules(), bus);
1271                    new ParentChildRulesJavaBinder().bind(new ParentChildRules(), bus);
1272                    new ParentRulesJavaBinder().bind(new ParentRules(), bus);
1273                    new SecondaryRulesJavaBinder().bind(new SecondaryRules(), bus);
1274                    new SiblingRulesJavaBinder().bind(new SiblingRules(), bus);
1275                    new SonRuleJavaBinder().bind(new SonRule(), bus);
1276                    new SpouseRulesJavaBinder().bind(new SpouseRules(), bus);
1277                    
1278                    if (!useSimpleMatcher) {
1279                            // Take dot snapshot
1280                            bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\CompiledFamilyTies_"+id+".dot")));
1281                    }
1282    
1283                    // Post seed relationships
1284                    Person kate = new Person("Kate", 58, false);
1285                    Person victor = new Person("Victor", 63, true);
1286                    bus.post(new Spouse(kate, victor));
1287    
1288                    Person peter = new Person("Peter", 37, true);
1289                    bus.post(new Child(peter, kate));
1290                    bus.post(new Child(peter, victor));
1291                    
1292                    Person alison = new Person("Alison", 36, false);
1293                    bus.post(new Spouse(peter, alison));
1294    
1295                    Person lucy = new Person("Lucy", 17, false);
1296                    bus.post(new Child(lucy, alison));
1297                    
1298                    Person nancy = new Person("Nancy", 14, false);
1299                    bus.post(new Child(nancy, peter));
1300                    
1301                    Person dan = new Person("Dan", 7, true);
1302                    bus.post(new Child(dan, peter));
1303                    bus.post(new Child(dan, alison));
1304                    
1305                    Person audrey = new Person("Audrey", 4, false);
1306                    bus.post(new Child(audrey, peter));
1307                    bus.post(new Child(audrey, alison));            
1308                    
1309                    Person tanya = new Person("Tanya", 31, false);
1310                    Person max = new Person("Max", 32, true);
1311                    bus.post(new Spouse(tanya, max));
1312                    bus.post(new Child(tanya, kate));
1313                    bus.post(new Child(tanya, victor));
1314    
1315                    Person vilma = new Person("Vilma", 14, false);
1316                    bus.post(new Child(vilma, tanya));
1317                    
1318                    Person george = new Person("George", 10, true);
1319                    bus.post(new Child(george, tanya));
1320                    
1321                    Person lisa = new Person("Lisa", 5, false);
1322                    bus.post(new Child(lisa, tanya));
1323                    bus.post(new Child(lisa, max));         
1324                                    
1325                    bus.join();
1326                    
1327                    if (!useSimpleMatcher) {
1328                            // Take browser snapshot
1329                            SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\CompiledFamilyTies.snapshot"));
1330                            bus.takeSnapshot(emfSnapshot);
1331                    }
1332                    
1333                    int numberOfLisaRelatives = 0;
1334                    for (Relative r: bus.getStore().getRelatives(lisa)) {
1335                            ++numberOfLisaRelatives;
1336                    }
1337                    assertEquals(11, numberOfLisaRelatives);
1338                    
1339                    for (Relative relative: bus.getStore().getRelatives(lisa)) {
1340                            System.out.println(relative);
1341                    }
1342            }
1343            
1344            @Test
1345            public void testFamilyTiesJmx() throws InterruptedException {   
1346                    
1347                    // Create bus
1348                    FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1349                    storeConfig.setExecutorService(executorService);
1350                    FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1351                    
1352                    FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1353                    busConfig.setStore(eventStore);
1354                    busConfig.setExecutorService(executorService);
1355                    busConfig.setInferenceFilter(new DuplicatesFilter());
1356                    
1357                    JmxStatsCollector jsc = new JmxStatsCollector(ManagementFactory.getPlatformMBeanServer(), "Hammurapi Group:root=Family Ties"); 
1358                    busConfig.setStatsCollector(jsc);
1359                    
1360                    FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1361                    
1362                    Thread.sleep(40000);
1363                    
1364                    FamilyTiesIntrospector introspector = new FamilyTiesIntrospector(null, null);
1365                    
1366                    // Bind rules.
1367                    introspector.bind(new DaughterRule(), bus);
1368                    introspector.bind(new GrandRules(), bus);
1369                    introspector.bind(new ParentChildRules(), bus);
1370                    introspector.bind(new ParentRules(), bus);
1371                    introspector.bind(new SecondaryRules(), bus);
1372                    introspector.bind(new SiblingRules(), bus);
1373                    introspector.bind(new SonRule(), bus);
1374                    introspector.bind(new SpouseRules(), bus);
1375                    
1376                    // Take dot snapshot
1377                    bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\FamilyTies_Jmx.dot")));          
1378    
1379                    // Post seed relationships
1380                    Person kate = new Person("Kate", 58, false);
1381                    Person victor = new Person("Victor", 63, true);
1382                    bus.post(new Spouse(kate, victor));
1383                    Thread.sleep(2000);
1384    
1385                    Person peter = new Person("Peter", 37, true);
1386                    bus.post(new Child(peter, kate));
1387                    Thread.sleep(2000);
1388                    bus.post(new Child(peter, victor));
1389                    Thread.sleep(2000);
1390                    
1391                    Person alison = new Person("Alison", 36, false);
1392                    bus.post(new Spouse(peter, alison));
1393                    Thread.sleep(2000);
1394    
1395                    Person lucy = new Person("Lucy", 17, false);
1396                    bus.post(new Child(lucy, alison));
1397                    Thread.sleep(2000);
1398                    
1399                    Person nancy = new Person("Nancy", 14, false);
1400                    bus.post(new Child(nancy, peter));
1401                    Thread.sleep(2000);
1402                    
1403                    Person dan = new Person("Dan", 7, true);
1404                    bus.post(new Child(dan, peter));
1405                    Thread.sleep(2000);
1406                    bus.post(new Child(dan, alison));
1407                    Thread.sleep(2000);
1408                    
1409                    Person audrey = new Person("Audrey", 4, false);
1410                    bus.post(new Child(audrey, peter));
1411                    Thread.sleep(2000);
1412                    bus.post(new Child(audrey, alison));            
1413                    Thread.sleep(2000);
1414                    
1415                    Person tanya = new Person("Tanya", 31, false);
1416                    Person max = new Person("Max", 32, true);
1417                    bus.post(new Spouse(tanya, max));
1418                    Thread.sleep(2000);
1419                    bus.post(new Child(tanya, kate));
1420                    Thread.sleep(2000);
1421                    bus.post(new Child(tanya, victor));
1422                    Thread.sleep(2000);
1423    
1424                    Person vilma = new Person("Vilma", 14, false);
1425                    bus.post(new Child(vilma, tanya));
1426                    Thread.sleep(2000);
1427                    
1428                    Person george = new Person("George", 10, true);
1429                    bus.post(new Child(george, tanya));
1430                    Thread.sleep(2000);
1431                    
1432                    Person lisa = new Person("Lisa", 5, false);
1433                    bus.post(new Child(lisa, tanya));
1434                    Thread.sleep(2000);
1435                    bus.post(new Child(lisa, max));         
1436                    Thread.sleep(2000);
1437                                    
1438                    Thread.sleep(30000);
1439                    
1440                    bus.join();
1441                    
1442            }
1443            
1444            // --- Remove tests ---
1445            
1446            /**
1447             * - Post event to the bus
1448             * - Check presence
1449             * - Remove event using bus.remove()
1450             * - Check that event is gone from the store.
1451             */
1452            @Test
1453            public void testRemoveWithBus() throws Exception {
1454                    for (ObjectBusFactory obf: createObjectBus(null)) {
1455                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1456                            
1457                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1458                            
1459                            PostingHandler ph = new PostingHandler();
1460                            introspector.bind(ph, bus);
1461                                                    
1462                            bus.post("World");
1463                            bus.join();
1464                            
1465                            assertEquals(1, ph.getWorldCounter());
1466                            assertTrue(ph.isWorldOk());
1467                            
1468                            assertEquals(1, ph.getEmCounter());
1469                            assertTrue(ph.isEmOk());
1470                            
1471                            Set<String> expected = new HashSet<String>();
1472                            expected.add("Hello");
1473                            expected.add("World");
1474                            expected.add("!");
1475                                                                            
1476                            for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1477                                    if (!expected.remove(h.getEvent())) {
1478                                            fail("Unexpected object in the store: "+h.getEvent());
1479                                    }
1480                            }
1481                            
1482                            if (!expected.isEmpty()) {
1483                                    fail("Not present in the store: "+expected);
1484                            }
1485                            
1486                            bus.remove("World");
1487                            bus.join();
1488                            
1489                            for (Object o: bus.getStore()) {
1490                                    fail("Unexpected object in the store: "+o);
1491                            }                       
1492                    }
1493            }
1494            
1495            /**
1496             * - Post event to the bus
1497             * - Check presence
1498             * - Remove event using Handle.remove()
1499             * - Check that event is gone from the store.
1500             */
1501            @Test
1502            public void testRemoveWithHandle() throws Exception {
1503                    for (ObjectBusFactory obf: createObjectBus(null)) {
1504                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1505                            
1506                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1507                            
1508                            PostingHandler ph = new PostingHandler();
1509                            introspector.bind(ph, bus);
1510                                                    
1511                            Handle<Object, Integer, Object, Long> wHandle = bus.post("World");
1512                            bus.join();
1513                            
1514                            assertEquals(1, ph.getWorldCounter());
1515                            assertTrue(ph.isWorldOk());
1516                            
1517                            assertEquals(1, ph.getEmCounter());
1518                            assertTrue(ph.isEmOk());
1519                            
1520                            Set<String> expected = new HashSet<String>();
1521                            expected.add("Hello");
1522                            expected.add("World");
1523                            expected.add("!");
1524                                                    
1525                            for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1526                                    if (!expected.remove(h.getEvent())) {
1527                                            fail("Unexpected object in the store: "+h.getEvent());
1528                                    }
1529                            }
1530                            
1531                            if (!expected.isEmpty()) {
1532                                    fail("Not present in the store: "+expected);
1533                            }
1534                            
1535                            wHandle.remove();
1536                            bus.join();
1537                            
1538                            for (Object o: bus.getStore()) {
1539                                    fail("Unexpected object in the store: "+o);
1540                            }                       
1541                    }
1542            }
1543                    
1544            /**
1545             * - Post event
1546             * - Validate that remove handler not fired
1547             * - Remove event
1548             * - Validate event handler fired
1549             * @throws Exception
1550             */
1551            @Test
1552            public void testRemoveHandlers() throws Exception {
1553                    for (ObjectBusFactory obf: createObjectBus(null)) {
1554                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();                     
1555                            
1556                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1557                            
1558                            PostingHandler ph = new PostingHandler();
1559                            introspector.bind(ph, bus);
1560                            
1561                            RemoveHandler rh = new RemoveHandler();
1562                            introspector.bind(rh, bus);                     
1563                            if (!obf.useSimpleMatcher) {
1564                                    SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandlers_"+obf.id+".snapshot"));
1565                                    bus.takeSnapshot(emfSnapshot);
1566                                    
1567                                    takeSnapshot(bus, "snapshots\\RemoveHandlers_"+obf.id+".dot");
1568                            }                       
1569                                                    
1570                            Handle<Object, Integer, Object, Long> wHandle = bus.post("World");
1571                            bus.join();
1572                            
1573                            assertEquals(1, ph.getWorldCounter());
1574                            assertTrue(ph.isWorldOk());
1575                            
1576                            assertEquals(1, ph.getEmCounter());
1577                            assertTrue(ph.isEmOk());
1578                            
1579                            assertEquals(0, rh.getWorldCounter());                  
1580                            assertEquals(0, rh.getEmCounter());
1581                            
1582                            Set<String> expected = new HashSet<String>();
1583                            expected.add("Hello");
1584                            expected.add("World");
1585                            expected.add("!");
1586                                                    
1587                            for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1588                                    if (!expected.remove(h.getEvent())) {
1589                                            fail("Unexpected object in the store: "+h.getEvent());
1590                                    }
1591                            }
1592                            
1593                            if (!expected.isEmpty()) {
1594                                    fail("Not present in the store: "+expected);
1595                            }
1596                            
1597                            wHandle.remove();
1598                            bus.join();
1599                            
1600                            int counter=0;
1601                            for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1602                                    ++counter;
1603                                    if (!"Bye!".equals(h.getEvent())) {
1604                                                    fail("Unexpected object in the store: "+h.getEvent());
1605                                    }
1606                            }                               
1607                            assertEquals(1, ph.getWorldCounter());
1608                            assertTrue(ph.isWorldOk());
1609                            
1610                            assertEquals(1, ph.getEmCounter());
1611                            assertTrue(ph.isEmOk());
1612                            
1613                            assertEquals(1, rh.getWorldCounter());                  
1614                            assertTrue(rh.isWorldOk());
1615                            
1616                            assertEquals(1, rh.getEmCounter());
1617                            assertTrue(rh.isEmOk());
1618                            
1619                            assertEquals(String.valueOf(obf), 1, counter);          
1620                    }
1621            }
1622            
1623            /**
1624             * - Post 1st event, validate not fired
1625             * - Post 2nd event, validate not fired
1626             * - Remove 2nd event, validate fired
1627             * - Remove 1st event, validate not fired 
1628             * @throws Exception
1629             */
1630            @Test
1631            public void testFireJoinRemoveHandler() throws Exception {
1632                    for (ObjectBusFactory obf: createObjectBus(null)) {
1633                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();                     
1634                            
1635                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1636                            
1637                            JoinRemoveHandler jrh = new JoinRemoveHandler();
1638                            introspector.bind(jrh, bus);
1639                            
1640                            if (!obf.useSimpleMatcher) {
1641                                    SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\JoinRemoveHandler_"+obf.id+".snapshot"));
1642                                    bus.takeSnapshot(emfSnapshot);
1643                                    
1644                                    takeSnapshot(bus, "snapshots\\JoinRemoveHandler_"+obf.id+".dot");
1645                            }                       
1646                                                    
1647                            bus.post("World");
1648                            bus.join();
1649                            
1650                            assertEquals(0, jrh.getJoinCounter());
1651                            
1652                            bus.post("Hello");
1653                            bus.join();
1654                            
1655                            assertEquals(0, jrh.getJoinCounter());
1656                            
1657                            bus.remove("Hello");
1658                            bus.join();
1659                            
1660                            assertEquals(1, jrh.getJoinCounter());
1661                            assertTrue(jrh.isJoinOk());
1662                            
1663                            bus.remove("World");
1664                            bus.join();
1665                            
1666                            assertEquals(1, jrh.getJoinCounter());
1667                            assertTrue(jrh.isJoinOk());                     
1668                    }
1669            }
1670            
1671            /**
1672             * - Post event which matcher remove handler 1
1673             * - Update event with value which matches remove handler 2
1674             * - Verify that remove handler 1 is fired
1675             * - Remove event
1676             * - Verify that remove handler 2 is fired
1677             * @throws Exception
1678             */
1679            @Test
1680            public void testUpdateFireRemoveHandlers() throws Exception {
1681                    for (ObjectBusFactory obf: createObjectBus(null)) {
1682                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1683                            if (obf.pkStore) {
1684                                    continue; // No PK stores - events are mutable.
1685                            }
1686                            
1687                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1688                            
1689                            RemoveHandler2 rh = new RemoveHandler2();
1690                            introspector.bind(rh, bus);
1691                            
1692                            HelperHandler hh = new HelperHandler();
1693                            introspector.bind(hh, bus);
1694                            
1695                            if (!obf.useSimpleMatcher) {
1696                                    SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandler2_"+obf.id+".snapshot"));
1697                                    bus.takeSnapshot(emfSnapshot);
1698                                    
1699                                    takeSnapshot(bus, "snapshots\\RemoveHandler2_"+obf.id+".dot");
1700                            }                       
1701                            
1702                            AtomicReference<String> aRef = new AtomicReference<String>("World");
1703                            
1704                            rh.setExpectedWorld(aRef);
1705                                                    
1706                            Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1707                            bus.join();
1708                            
1709                            assertEquals(0, rh.getWorldCounter());
1710                            assertEquals(0, rh.getEmCounter());
1711                            
1712                            assertEquals(1, hh.getWorldCounter());
1713                            assertTrue(hh.isWorldOk());
1714                            
1715                            assertEquals(0, hh.getEmCounter());
1716                            
1717                            aRef.set("!");
1718                            bus.join();
1719                            assertEquals(0, rh.getWorldCounter());
1720                            assertEquals(0, rh.getEmCounter());
1721                            
1722                            assertEquals(1, hh.getWorldCounter());
1723                            assertTrue(hh.isWorldOk());
1724                            
1725                            assertEquals(0, hh.getEmCounter());
1726                            
1727                            h.update();
1728                            bus.join();
1729                            assertEquals(1, rh.getWorldCounter());
1730                            assertTrue(rh.isWorldOk());                     
1731                            assertEquals(0, rh.getEmCounter());
1732                            
1733                            assertEquals(1, hh.getWorldCounter());
1734                            assertTrue(hh.isWorldOk());
1735                            
1736                            assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1737                            assertTrue(hh.isEmOk());
1738                            
1739                            h.remove();
1740                            bus.join();
1741                            assertEquals(1, rh.getWorldCounter());
1742                            assertTrue(rh.isWorldOk());                     
1743                            assertEquals(1, rh.getEmCounter());
1744                            assertTrue(rh.isEmOk());
1745                            
1746                            assertEquals(1, hh.getWorldCounter());
1747                            assertTrue(hh.isWorldOk());
1748                            
1749                            assertEquals(1, hh.getEmCounter());
1750                            assertTrue(hh.isEmOk());
1751                            
1752                    }
1753            }
1754            
1755            /**
1756             * Update event through dispatch context. Verify that remove
1757             * handlers for old value are fired and that post handlers for 
1758             * new value are fired. Use join handler or AFTER_EVENT policy.
1759             * @throws Exception
1760             */
1761            @Test
1762            public void testUpdateFromHandler() throws Exception {
1763                    for (ObjectBusFactory obf: createObjectBus(null)) {
1764                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1765                            if (obf.pkStore) {
1766                                    continue; // No PK stores - events are mutable.
1767                            }
1768                            
1769                            if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
1770                                    continue;
1771                            }
1772                            
1773                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1774                            
1775                            RemoveHandler2 rh = new RemoveHandler2();
1776                            introspector.bind(rh, bus);
1777                            
1778                            HelperHandler2 hh = new HelperHandler2();
1779                            introspector.bind(hh, bus);
1780                            
1781                            if (!obf.useSimpleMatcher) {
1782                                    SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandler2_"+obf.id+".snapshot"));
1783                                    bus.takeSnapshot(emfSnapshot);
1784                                    
1785                                    takeSnapshot(bus, "snapshots\\RemoveHandler2_"+obf.id+".dot");
1786                            }                       
1787                            
1788                            AtomicReference<String> aRef = new AtomicReference<String>("World");
1789                            
1790                            rh.setExpectedWorld(aRef);
1791                                                    
1792                            Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1793                            bus.join();
1794                            
1795                            assertEquals(0, rh.getWorldCounter());
1796                            assertEquals(0, rh.getEmCounter());
1797                            
1798                            assertEquals(1, hh.getWorldCounter());
1799                            assertTrue(hh.isWorldOk());
1800                            
1801                            assertEquals(0, hh.getEmCounter());
1802                                                    
1803                            bus.post("Hello"); // Hello fires join handler with World, which updates World to !
1804                            bus.join();
1805                            assertEquals(1, hh.getJoinCounter());
1806                            assertTrue(hh.isJoinOk());
1807                            
1808                            assertEquals(1, rh.getWorldCounter());
1809                            assertTrue(rh.isWorldOk());                     
1810                            assertEquals(0, rh.getEmCounter());
1811                            
1812                            assertEquals(1, hh.getWorldCounter());
1813                            assertTrue(hh.isWorldOk());
1814                            
1815                            assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1816                            assertTrue(hh.isEmOk());
1817                            
1818                            h.remove();
1819                            bus.join();
1820                            assertEquals(1, rh.getWorldCounter());
1821                            assertTrue(rh.isWorldOk());                     
1822                            assertEquals(1, rh.getEmCounter());
1823                            assertTrue(rh.isEmOk());
1824                            
1825                            assertEquals(1, hh.getWorldCounter());
1826                            assertTrue(hh.isWorldOk());
1827                            
1828                            assertEquals(1, hh.getEmCounter());
1829                            assertTrue(hh.isEmOk());
1830                            
1831                    }
1832            }
1833            
1834            public static class ObservableStringReference implements Observable<ObservableStringReference> {
1835                    
1836                    public ObservableStringReference(String value) {
1837                            super();
1838                            this.value = value;
1839                    }
1840    
1841                    private Collection<Observer<? super ObservableStringReference>> observers = new ArrayList<Observer<? super ObservableStringReference>>();
1842    
1843                    @Override
1844                    public synchronized void addObserver(Observer<? super ObservableStringReference> o) {                                     
1845                            observers.add(o);
1846                    }
1847    
1848                    @Override
1849                    public synchronized void deleteObserver(Observer<? super ObservableStringReference> o) {
1850                            observers.remove(o);
1851                    }
1852                    
1853                    public Collection<Observer<? super ObservableStringReference>> getObservers() {
1854                            return observers;
1855                    }
1856                    
1857                    private volatile String value;
1858                    
1859                    public synchronized void set(String value) {
1860                            this.value = value;
1861                            for (Observer<? super ObservableStringReference> o: observers) {
1862                                    o.update(this);
1863                            }
1864                    }
1865                    
1866                    public String get() {
1867                            return value;
1868                    }
1869    
1870                    
1871            };
1872            
1873            /**
1874             * - Post event which matches handler 1, verify fired
1875             * - Update event to match handler 2, verify that handler 2 is fired
1876             * @throws Exception
1877             */
1878            @Test
1879            public void testUpdateThroughObservable() throws Exception {
1880                    ObservableConverter<Object> observableConverter = new ObservableConverter<Object>() {
1881    
1882                            @Override
1883                            public Observable<Object> convert(Object obj) {
1884                                    return obj instanceof Observable ? (Observable<Object>) obj : null;
1885                            }
1886                            
1887                    }; 
1888                    for (ObjectBusFactory obf: createObjectBus(null)) {
1889                            obf.setObservableConverter(observableConverter);
1890                            LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1891                            if (obf.pkStore) {
1892                                    continue; // No PK stores - events are mutable.
1893                            }
1894                            
1895                            LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1896                            
1897                            RemoveHandler2o rh = new RemoveHandler2o();
1898                            introspector.bind(rh, bus);
1899                            
1900                            HelperHandler_o hh = new HelperHandler_o();
1901                            introspector.bind(hh, bus);
1902                            
1903                            if (!obf.useSimpleMatcher) {
1904                                    SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\UpdateThroughObservable_"+obf.id+".snapshot"));
1905                                    bus.takeSnapshot(emfSnapshot);
1906                                    
1907                                    takeSnapshot(bus, "snapshots\\UpdateThroughObservable_"+obf.id+".dot");
1908                            }                       
1909                            
1910                            ObservableStringReference aRef = new ObservableStringReference("World");
1911                            
1912                            rh.setExpectedWorld(aRef);
1913                            
1914                            assertTrue(aRef.getObservers().isEmpty());
1915                                                    
1916                            Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1917                            bus.join();
1918                            
1919                            assertEquals(1, aRef.getObservers().size());
1920                            
1921                            assertEquals(0, rh.getWorldCounter());
1922                            assertEquals(0, rh.getEmCounter());
1923                            
1924                            assertEquals(1, hh.getWorldCounter());
1925                            assertTrue(hh.isWorldOk());
1926                            
1927                            assertEquals(0, hh.getEmCounter());
1928                            
1929                            aRef.set("!");
1930                            bus.join();
1931                            assertEquals(1, rh.getWorldCounter());
1932                            assertTrue(rh.isWorldOk());                     
1933                            assertEquals(0, rh.getEmCounter());
1934                            
1935                            assertEquals(1, hh.getWorldCounter());
1936                            assertTrue(hh.isWorldOk());
1937                            
1938                            assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1939                            assertTrue(hh.isEmOk());
1940                            
1941                            h.remove();
1942                            bus.join();
1943                            assertEquals(1, rh.getWorldCounter());
1944                            assertTrue(rh.isWorldOk());                     
1945                            assertEquals(1, rh.getEmCounter());
1946                            assertTrue(rh.isEmOk());
1947                            
1948                            assertEquals(1, hh.getWorldCounter());
1949                            assertTrue(hh.isWorldOk());
1950                            
1951                            assertEquals(1, hh.getEmCounter());
1952                            assertTrue(hh.isEmOk());
1953                            
1954                            assertTrue(aRef.getObservers().isEmpty());
1955                            
1956                    }
1957            }
1958                                            
1959    }