001 /**
002 *
003 */
004 package com.hammurapi.eventbus.tests;
005
006 import static junit.framework.Assert.assertEquals;
007 import static junit.framework.Assert.assertFalse;
008 import static junit.framework.Assert.assertNotNull;
009 import static junit.framework.Assert.assertNotSame;
010 import static junit.framework.Assert.assertSame;
011 import static junit.framework.Assert.assertTrue;
012 import static junit.framework.Assert.fail;
013
014 import java.io.File;
015 import java.lang.management.ManagementFactory;
016 import java.util.ArrayList;
017 import java.util.Collection;
018 import java.util.Collections;
019 import java.util.HashMap;
020 import java.util.HashSet;
021 import java.util.Map;
022 import java.util.Set;
023 import java.util.concurrent.ExecutorService;
024 import java.util.concurrent.Executors;
025 import java.util.concurrent.atomic.AtomicInteger;
026 import java.util.concurrent.atomic.AtomicReference;
027
028 import org.junit.After;
029 import org.junit.Before;
030 import org.junit.Test;
031
032 import com.hammurapi.common.MapTokenSource;
033 import com.hammurapi.common.Observable;
034 import com.hammurapi.common.ObservableConverter;
035 import com.hammurapi.common.Observer;
036 import com.hammurapi.common.TokenExpander;
037 import com.hammurapi.common.TokenExpander.TokenSource;
038 import com.hammurapi.eventbus.AbstractEventBus.Handle;
039 import com.hammurapi.eventbus.Derivation;
040 import com.hammurapi.eventbus.DuplicatesFilter;
041 import com.hammurapi.eventbus.EventDispatchContext;
042 import com.hammurapi.eventbus.EventDispatchException;
043 import com.hammurapi.eventbus.EventHandlerBase.Mode;
044 import com.hammurapi.eventbus.InferenceChainLengthFilter;
045 import com.hammurapi.eventbus.InferenceFilter;
046 import com.hammurapi.eventbus.InferencePolicy;
047 import com.hammurapi.eventbus.JavaBinderCompiler;
048 import com.hammurapi.eventbus.local.LocalAbstractEventHandler;
049 import com.hammurapi.eventbus.local.LocalDispatchNetworkDotSnapshot;
050 import com.hammurapi.eventbus.local.LocalEventBus;
051 import com.hammurapi.eventbus.local.LocalEventStore;
052 import com.hammurapi.eventbus.local.LocalEventStoreImpl;
053 import com.hammurapi.eventbus.local.LocalIntrospector;
054 import com.hammurapi.eventbus.local.LocalSimpleMatcher;
055 import com.hammurapi.eventbus.monitoring.JmxStatsCollector;
056 import com.hammurapi.eventbus.snapshot.io.SnapshotOutput;
057 import com.hammurapi.eventbus.tests.familyties.FamilyTiesDispatchNetworkDotSnapshot;
058 import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventBus;
059 import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventStore;
060 import com.hammurapi.eventbus.tests.familyties.FamilyTiesEventStoreImpl;
061 import com.hammurapi.eventbus.tests.familyties.FamilyTiesIntrospector;
062 import com.hammurapi.eventbus.tests.familyties.model.Child;
063 import com.hammurapi.eventbus.tests.familyties.model.Person;
064 import com.hammurapi.eventbus.tests.familyties.model.Relative;
065 import com.hammurapi.eventbus.tests.familyties.model.Spouse;
066 import com.hammurapi.eventbus.tests.familyties.rules.DaughterRule;
067 import com.hammurapi.eventbus.tests.familyties.rules.DaughterRuleJavaBinder;
068 import com.hammurapi.eventbus.tests.familyties.rules.FamilyTiesRules;
069 import com.hammurapi.eventbus.tests.familyties.rules.GrandRules;
070 import com.hammurapi.eventbus.tests.familyties.rules.GrandRulesJavaBinder;
071 import com.hammurapi.eventbus.tests.familyties.rules.ParentChildRules;
072 import com.hammurapi.eventbus.tests.familyties.rules.ParentChildRulesJavaBinder;
073 import com.hammurapi.eventbus.tests.familyties.rules.ParentRules;
074 import com.hammurapi.eventbus.tests.familyties.rules.ParentRulesJavaBinder;
075 import com.hammurapi.eventbus.tests.familyties.rules.SecondaryRules;
076 import com.hammurapi.eventbus.tests.familyties.rules.SecondaryRulesJavaBinder;
077 import com.hammurapi.eventbus.tests.familyties.rules.SiblingRules;
078 import com.hammurapi.eventbus.tests.familyties.rules.SiblingRulesJavaBinder;
079 import com.hammurapi.eventbus.tests.familyties.rules.SonRule;
080 import com.hammurapi.eventbus.tests.familyties.rules.SonRuleJavaBinder;
081 import com.hammurapi.eventbus.tests.familyties.rules.SpouseRules;
082 import com.hammurapi.eventbus.tests.familyties.rules.SpouseRulesJavaBinder;
083 import com.hammurapi.eventbus.tests.fastfood.Cashier;
084 import com.hammurapi.eventbus.tests.fastfood.Dish;
085 import com.hammurapi.eventbus.tests.fastfood.Kitchen;
086 import com.hammurapi.eventbus.tests.fastfood.Order;
087 import com.hammurapi.eventbus.tests.fastfood.OrderFulfiller;
088 import com.hammurapi.extract.AbstractPredicate;
089 import com.hammurapi.extract.Extractor;
090 import com.hammurapi.extract.InstanceOfPredicate;
091 import com.hammurapi.extract.Predicate;
092
093 /**
094 * @author Pavel Vlasov
095 *
096 */
097 public class LocalEventBusTests {
098
099 private ExecutorService executorService;
100
101 /**
102 * @throws java.lang.Exception
103 */
104 @Before
105 public void setUp() throws Exception {
106 // BlockingQueue<Runnable> wq = new ArrayBlockingQueue<Runnable>(1000);
107 executorService = Executors.newFixedThreadPool(2);
108 }
109
110 /**
111 * @throws java.lang.Exception
112 */
113 @After
114 public void tearDown() throws Exception {
115 executorService.shutdown();
116 }
117
118 @Test
119 public void testParallelExecution() throws InterruptedException {
120 for (ObjectBusFactory obf: createObjectBus(null)) {
121 if (!obf.isUseExecutor() || obf.inferencePolicy!=InferencePolicy.IMMEDIATELY) {
122 continue;
123 }
124
125 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
126
127 final Thread[] invocationThreads = new Thread[3];
128 invocationThreads[0] = Thread.currentThread();
129
130 final Object[] handled = {null};
131 /**
132 * Predicate is used to record its invocation thread.
133 */
134 Predicate<Object,Object> predicate = new AbstractPredicate<Object, Object>(0, null, false, 0) {
135
136 @Override
137 protected Boolean extractInternal(
138 Object context,
139 Map<Object, Map<Extractor<Object, ? super Boolean, Object>, ? super Boolean>> cache,
140 Object... obj)
141 {
142 invocationThreads[1] = Thread.currentThread();
143 return "Test event".equals(obj[0]);
144 }
145
146 };
147
148 /**
149 * Handler which does nothing, just records its invocation thread.
150 */
151 LocalAbstractEventHandler<Object, Integer, Object> handler = new LocalAbstractEventHandler<Object, Integer, Object>() {
152
153 @Override
154 public void post(
155 EventDispatchContext<Object, Integer, Object, Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> context,
156 Object... events) {
157 invocationThreads[2] = Thread.currentThread();
158 handled[0] = events[0];
159 }
160
161 };
162
163 handler.setPredicate(predicate);
164
165 bus.addHandler(handler);
166
167 if (!obf.isUseSimpleMatcher()) {
168 takeSnapshot(bus, "snapshots\\snapshot1_"+obf.id+".dot");
169 }
170
171 final String event = "Test event";
172 bus.post(event);
173
174 // Without delay tasks will be most likely executed in the main thread.
175 Thread.sleep(100);
176
177 bus.join();
178
179 assertNotNull(invocationThreads[1]);
180 assertNotNull(invocationThreads[2]);
181 // assertNotSame(invocationThreads[0], invocationThreads[1]); // Can't guarantee that it'll be executed by another thread.
182 assertNotSame(invocationThreads[0], invocationThreads[2]);
183 assertSame(event, handled[0]);
184 }
185 }
186
187 private void takeSnapshot(LocalEventBus<Object, Integer, Object> bus, String fileName) {
188 bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File(fileName)));
189 }
190
191 @Test
192 public void testSimpleStringHandler() throws InterruptedException {
193 for (StringBusFactory sbf: createStringBus(null)) {
194 LocalEventBus<String, Integer, Object> bus = sbf.createBus();
195
196 LocalIntrospector<String, Object> introspector = new LocalIntrospector<String, Object>(null, null);
197 SimpleStringHandler handler = new SimpleStringHandler();
198 introspector.bind(handler, bus);
199
200 if (!sbf.isUseSimpleMatcher()) {
201 bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<String, Integer, Object>(new File("snapshots\\SimpleStringHandler_"+sbf.id+".dot")));
202 }
203
204 bus.post("Hello world!");
205 bus.join();
206
207 assertEquals(1, handler.getCounter());
208 }
209 }
210
211 @Test
212 public void testSimpleStringHandler2() throws InterruptedException {
213 for (ObjectBusFactory obf: createObjectBus(null)) {
214 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
215
216 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
217 SimpleStringHandler handler = new SimpleStringHandler();
218 introspector.bind(handler, bus);
219
220 if (!obf.isUseSimpleMatcher()) {
221 takeSnapshot(bus, "snapshots\\SimpleStringHandler2_"+obf.id+".dot");
222 }
223
224 bus.post("Hello world!");
225 bus.join();
226
227 assertEquals(1, handler.getCounter());
228 }
229 }
230
231 @Test
232 public void testStringHandlerWithMethodCondition() throws InterruptedException {
233 for (ObjectBusFactory obf: createObjectBus(null)) {
234 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
235
236 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
237 StringHandlerWithMethodCondition handler = new StringHandlerWithMethodCondition();
238 introspector.bind(handler, bus);
239
240 if (!obf.isUseSimpleMatcher()) {
241 takeSnapshot(bus, "snapshots\\StringHandlerWithMethodCondition_"+obf.id+".dot");
242 }
243
244 bus.post("Hello");
245 bus.post("World");
246 bus.join();
247
248 assertEquals(1, handler.getCounter());
249 assertTrue(handler.isOk());
250 }
251 }
252
253 @Test
254 public void testStringHandlerWithParameterCondition() throws InterruptedException {
255 for (ObjectBusFactory obf: createObjectBus(null)) {
256 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
257
258 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
259 StringHandlerWithParameterCondition handler = new StringHandlerWithParameterCondition();
260 introspector.bind(handler, bus);
261
262 if (!obf.isUseSimpleMatcher()) {
263 takeSnapshot(bus, "snapshots\\StringHandlerWithParameterCondition_"+obf.id+".dot");
264 }
265
266 bus.post("Hello");
267 bus.post("World");
268 bus.join();
269
270 assertEquals(1, handler.getCounter());
271 assertTrue(handler.isOk());
272 }
273 }
274
275 @Test
276 public void testParameterizedStringHandlerWithMethodCondition() throws InterruptedException {
277 for (ObjectBusFactory obf: createObjectBus(null)) {
278 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
279
280 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
281 ParameterizedStringHandlerWithMethodCondition handler = new ParameterizedStringHandlerWithMethodCondition();
282 handler.setFilterStr("Hello");
283 introspector.bind(handler, bus);
284
285 if (!obf.isUseSimpleMatcher()) {
286 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ParameterizedStringHandlerWithMethodCondition_"+obf.id+".dot"));
287 snapshot.setGraphAttribute("dpi", "70");
288 bus.takeSnapshot(snapshot);
289 }
290
291 bus.post("Hello");
292 bus.post("World");
293 bus.join();
294
295 assertEquals(1, handler.getCounter());
296 assertTrue(handler.isOk());
297 }
298 }
299
300
301 @Test
302 public void testTokenParameterizedStringHandlerWithMethodCondition() throws InterruptedException {
303 for (ObjectBusFactory obf: createObjectBus(null)) {
304 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
305
306 Map<String, String> map = new HashMap<String, String>();
307 map.put("filterStr", "\"Hello\"");
308 TokenSource ts = new MapTokenSource(map);
309 TokenExpander te = new TokenExpander(ts);
310
311 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, te);
312 TokenParameterizedStringHandlerWithMethodCondition handler = new TokenParameterizedStringHandlerWithMethodCondition();
313
314 introspector.bind(handler, bus);
315
316 if (!obf.isUseSimpleMatcher()) {
317 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\TokenParameterizedStringHandlerWithMethodCondition_"+obf.id+".dot"));
318 snapshot.setGraphAttribute("dpi", "70");
319 bus.takeSnapshot(snapshot);
320 }
321
322 bus.post("Hello");
323 bus.post("World");
324 bus.join();
325
326 assertEquals(1, handler.getCounter());
327 assertTrue(handler.isOk());
328 }
329 }
330
331 @Test
332 public void testPost() throws InterruptedException {
333 for (ObjectBusFactory obf: createObjectBus(null)) {
334
335 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
336
337 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
338 StringHandlerWithMethodCondition helloHandler = new StringHandlerWithMethodCondition();
339 introspector.bind(helloHandler, bus);
340
341 PostingHandler ph = new PostingHandler();
342 introspector.bind(ph, bus);
343
344 if (!obf.isUseSimpleMatcher()) {
345 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Posting_"+obf.id+".dot"));
346 snapshot.setGraphAttribute("dpi", "70");
347 bus.takeSnapshot(snapshot);
348 }
349
350 bus.post("World");
351 bus.join();
352
353 assertEquals(1, ph.getWorldCounter());
354 assertTrue(ph.isWorldOk());
355
356 assertEquals("OBF: "+obf, 1, ph.getEmCounter());
357 assertTrue(ph.isEmOk());
358
359 assertEquals(obf+", ", 1, helloHandler.getCounter());
360 assertTrue(helloHandler.isOk());
361 }
362
363 }
364
365 @Test
366 public void testHandleJoin() throws InterruptedException {
367 for (ObjectBusFactory obf: createObjectBus(null)) {
368 if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
369 continue;
370 }
371
372 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
373
374 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
375 SlowPostingHandler ph = new SlowPostingHandler();
376 introspector.bind(ph, bus);
377
378 if (!obf.isUseSimpleMatcher()) {
379 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\HandleJoin_"+obf.id+".dot"));
380 snapshot.setGraphAttribute("dpi", "70");
381 bus.takeSnapshot(snapshot);
382 }
383
384 Handle<Object, Integer, Object, Long> worldHandle = bus.post("World");
385 worldHandle.join();
386
387 assertEquals(2, ph.getCounter());
388
389 Set<String> words = new HashSet<String>();
390 words.add("Hello");
391 words.add("World");
392 words.add("!");
393 for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
394 assertTrue(words.remove(h.getEvent()));
395 }
396 assertTrue(words.isEmpty());
397 }
398 }
399
400 @Test
401 public void testJoin() throws InterruptedException {
402 for (ObjectBusFactory obf: createObjectBus(null)) {
403
404 if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
405 continue;
406 }
407
408 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
409
410 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
411 JoinHandler joinHandler = new JoinHandler();
412 introspector.bind(joinHandler, bus);
413
414 if (!obf.isUseSimpleMatcher()) {
415 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Joining_"+obf.id+".dot"));
416 snapshot.setGraphAttribute("dpi", "70");
417 bus.takeSnapshot(snapshot);
418 }
419
420 bus.post("Hello");
421 bus.post(20);
422 bus.post(1000);
423 bus.post("Good bye!");
424 bus.join();
425
426 // Browser snapshot
427 if (!obf.useSimpleMatcher) {
428 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\Joining_"+obf.id+".snapshot"));
429 bus.takeSnapshot(emfSnapshot);
430 }
431
432 assertEquals(1, joinHandler.getHelloCounter());
433 assertEquals(1, joinHandler.getJoinCounter());
434
435 assertTrue(joinHandler.isHelloOk());
436 assertTrue(joinHandler.isJoinOk());
437
438 // Derivation test
439 for (Handle<Object,Integer,Object,Long> h: bus.getStore()) {
440 Object event = h.getEvent();
441 if (String.valueOf(event).startsWith("Hello World")) {
442 Collection<Derivation<Object, Integer, Object>> derivations = bus.getDerivations(event);
443 System.out.println(derivations);
444 }
445 }
446 }
447
448 }
449
450 @SuppressWarnings("unchecked")
451 @Test
452 public void testBinderCompiler() {
453 JavaBinderCompiler compiler = new JavaBinderCompiler(new File("generated.tests"));
454
455 compiler.compileJavaBinder(JoinHandler.class, LocalEventBus.class, null, null);
456 }
457
458 @Test
459 public void testFamilyTiesBindingCompilation() {
460 JavaBinderCompiler compiler = new JavaBinderCompiler(new File("generated.tests"));
461
462 compiler.compileJavaBinder(DaughterRule.class, FamilyTiesEventBus.class, null, null);
463 compiler.compileJavaBinder(GrandRules.class, FamilyTiesEventBus.class, null, null);
464 compiler.compileJavaBinder(ParentChildRules.class, FamilyTiesEventBus.class, null, null);
465 compiler.compileJavaBinder(ParentRules.class, FamilyTiesEventBus.class, null, null);
466 compiler.compileJavaBinder(SecondaryRules.class, FamilyTiesEventBus.class, null, null);
467 compiler.compileJavaBinder(SiblingRules.class, FamilyTiesEventBus.class, null, null);
468 compiler.compileJavaBinder(SonRule.class, FamilyTiesEventBus.class, null, null);
469 compiler.compileJavaBinder(SpouseRules.class, FamilyTiesEventBus.class, null, null);
470 }
471
472 @Test
473 public void testCompiledJoin() throws InterruptedException {
474 for (ObjectBusFactory obf: createObjectBus(null)) {
475 if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
476 continue;
477 }
478 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
479
480 JoinHandler joinHandler = new JoinHandler();
481 JoinHandlerJavaBinder joinHandlerJavaBinder = new JoinHandlerJavaBinder();
482 joinHandlerJavaBinder.bind(joinHandler, bus);
483
484 if (!obf.isUseSimpleMatcher()) {
485 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\Joining_"+obf.id+".dot"));
486 snapshot.setGraphAttribute("dpi", "70");
487 bus.takeSnapshot(snapshot);
488 }
489
490 bus.post("Hello");
491 bus.post(20);
492 bus.post(1000);
493 bus.post("Good bye!");
494 bus.join();
495
496 if (!obf.isUseSimpleMatcher()) {
497 // Browser snapshot
498 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\CompiledJoining"+obf.id+".snapshot"));
499 bus.takeSnapshot(emfSnapshot);
500 }
501
502 assertEquals(1, joinHandler.getHelloCounter());
503 assertEquals(1, joinHandler.getJoinCounter());
504
505 assertTrue(joinHandler.isHelloOk());
506 assertTrue(joinHandler.isJoinOk());
507
508 // Derivation test
509 for (Handle<Object,Integer,Object,Long> h: bus.getStore()) {
510 Object event = h.getEvent();
511 if (String.valueOf(event).startsWith("Hello World")) {
512 Collection<Derivation<Object, Integer, Object>> derivations = bus.getDerivations(event);
513 System.out.println(derivations);
514 }
515 }
516 }
517 }
518
519 @Test
520 public void testJoinWithCost() throws InterruptedException {
521 for (ObjectBusFactory obf: createObjectBus(null)) {
522 if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
523 continue;
524 }
525 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
526
527 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
528 JoinHandlerWithCost joinHandler = new JoinHandlerWithCost();
529 introspector.bind(joinHandler, bus);
530
531 if (!obf.isUseSimpleMatcher()) {
532 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\JoiningWithCost_"+obf.id+".dot"));
533 snapshot.setGraphAttribute("dpi", "70");
534 bus.takeSnapshot(snapshot);
535 }
536
537 bus.post("Hello");
538 bus.post(20);
539 bus.post(1000);
540 bus.post("World");
541 bus.join();
542
543 assertEquals(1, joinHandler.getJoinCounter());
544 assertTrue(joinHandler.isJoinOk());
545 }
546 }
547
548 @Test
549 public void testParameterNamesJoin() throws InterruptedException {
550 for (ObjectBusFactory obf: createObjectBus(null)) {
551 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
552
553 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
554 ParameterNamesJoinHandler joinHandler = new ParameterNamesJoinHandler();
555 introspector.bind(joinHandler, bus);
556
557 if (!obf.isUseSimpleMatcher()) {
558 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ParameterNamesJoining_"+obf.id+".dot"));
559 snapshot.setGraphAttribute("dpi", "70");
560 bus.takeSnapshot(snapshot);
561 }
562
563 bus.post("Hello");
564 bus.post(20);
565 bus.post(1000);
566 bus.post("Good bye!");
567 bus.join();
568
569 assertEquals(1, joinHandler.getHelloCounter());
570 assertEquals(1, joinHandler.getJoinCounter());
571
572 assertTrue(joinHandler.isHelloOk());
573 assertTrue(joinHandler.isJoinOk());
574 }
575
576 }
577
578 @Test
579 public void testPriority() throws InterruptedException {
580 for (ObjectBusFactory obf: createObjectBus(null)) {
581 if (obf.inferencePolicy.compareTo(InferencePolicy.AFTER_HANDLER)>=0) {
582 continue; // Priorities work only if inference commands are processed after handler or immediately.
583 }
584 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
585
586 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
587 PriorityHandler priorityHandler = new PriorityHandler();
588 introspector.bind(priorityHandler, bus);
589
590 bus.post("World");
591 bus.post("Hello");
592 bus.join();
593
594 assertEquals(obf+", ", 1, priorityHandler.getLpCounter());
595 assertEquals(obf+", ", 2, priorityHandler.getHpCounter());
596 assertEquals(obf+", ", 2, priorityHandler.getAhpCounter());
597 }
598 }
599
600 @Test
601 public void testOneOff() throws InterruptedException {
602 for (ObjectBusFactory obf: createObjectBus(null)) {
603 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
604
605 final AtomicInteger counter = new AtomicInteger();
606
607 LocalAbstractEventHandler<Object, Integer, Object> eh = new LocalAbstractEventHandler<Object, Integer, Object>(1, 0, null, false, true, Mode.POST) {
608
609
610 @Override
611 public void post(
612 EventDispatchContext<Object, Integer, Object, Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> context,
613 Object... events) {
614
615 counter.incrementAndGet();
616 }
617 };
618
619 bus.addHandler(eh);
620
621 bus.post("World");
622 bus.post("Hello");
623 bus.post("!");
624 bus.join();
625
626 assertEquals(1, counter.get());
627 }
628 }
629
630 @Test
631 public void testConsumeJoin() throws InterruptedException {
632 for (ObjectBusFactory obf: createObjectBus(null)) {
633 if (!obf.pkStore) {
634 continue;
635 }
636
637 if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
638 continue;
639 }
640
641 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
642
643 OrderFulfiller orderFulfiller = new OrderFulfiller();
644 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
645 introspector.bind(orderFulfiller, bus);
646
647 if (!obf.isUseSimpleMatcher()) {
648 LocalDispatchNetworkDotSnapshot<Object, Integer, Object> snapshot = new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\ConsumeJoin_"+obf.id+".dot"));
649 snapshot.setGraphAttribute("dpi", "70");
650 bus.takeSnapshot(snapshot);
651 }
652
653 int totalNumberOfDishes = 800;
654 Kitchen kitchen = new Kitchen(totalNumberOfDishes, bus);
655
656 int totalNumberOfOrders = 500;
657 Cashier cashier = new Cashier(totalNumberOfOrders, bus);
658
659 kitchen.start();
660 cashier.start();
661
662 kitchen.join();
663 cashier.join();
664 bus.join();
665 Thread.sleep(5000);
666
667 assertTrue(orderFulfiller.isOK());
668
669 int fulfilledOrders = 0;
670 for (Order o: orderFulfiller.getFulfilledOrders()) {
671 assertTrue(o.isFulfilled());
672 ++fulfilledOrders;
673 }
674
675 int partialOrderDishes = 0;
676 int unfulfilledOrders = 0;
677 Predicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> orderSelector = new InstanceOfPredicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>>(bus.getStore().getPrimaryKeyExtractor(), Order.class);
678 for (Object event: bus.getStore().get(orderSelector, bus.getStore().getPrimaryKeyExtractor(), false, null)) {
679 Order order = (Order) event;
680 assertNotNull(order);
681 assertFalse(order.isFulfilled());
682 if (order.getMainDish()!=null) {
683 ++partialOrderDishes;
684 }
685 if (order.getSideDish()!=null) {
686 ++partialOrderDishes;
687 }
688 assertFalse(orderFulfiller.getFulfilledOrders().contains(order));
689 ++unfulfilledOrders;
690 }
691 assertEquals(totalNumberOfOrders, fulfilledOrders+unfulfilledOrders);
692
693 int remainingDishes = 0;
694 Predicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>> dishSelector = new InstanceOfPredicate<Handle<Object, Integer, Object, Long>, LocalEventStore<Object, Integer, Object>>(bus.getStore().getPrimaryKeyExtractor(), Dish.class);
695 for (Handle<Object, Integer, Object, Long> h: bus.getStore().get(dishSelector)) {
696 Dish dish = (Dish) h.getEvent();
697 assertNotNull(dish);
698 assertFalse(dish.isConsumed());
699 ++remainingDishes;
700 }
701 assertEquals(totalNumberOfDishes, fulfilledOrders*2+remainingDishes+partialOrderDishes);
702 }
703 }
704
705 @Test
706 public void testPredicateChaining() throws InterruptedException {
707 for (StringBusFactory obf: createStringBus(null)) {
708 LocalEventBus<String, Integer, Object> bus = obf.createBus();
709
710 LocalIntrospector<String, Object> introspector = new LocalIntrospector<String, Object>(null, null);
711 PredicateChainingHandler handler = new PredicateChainingHandler();
712 introspector.bind(handler, bus);
713
714 if (!obf.isUseSimpleMatcher()) {
715 bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<String, Integer, Object>(new File("snapshots\\PredicateChainingHandler_"+obf.id+".dot")));
716 }
717
718 bus.post("Hello world!");
719 bus.join();
720
721 assertTrue(handler.isOneInvoked());
722 assertTrue(handler.isTwoInvoked());
723 }
724 }
725
726 @Test
727 public void testOppositeChaining() throws InterruptedException {
728 for (ObjectBusFactory obf: createObjectBus(null)) {
729 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
730
731 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
732 OppositeHandler handler = new OppositeHandler();
733 introspector.bind(handler, bus);
734
735 if (!obf.isUseSimpleMatcher()) {
736 bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\OppositeHandler_"+obf.id+".dot")));
737 }
738
739 bus.post("Hello world!");
740 bus.join();
741
742 assertFalse(handler.isOneInvoked());
743 assertTrue(handler.isTwoInvoked());
744 }
745 }
746
747 @Test
748 public void testMoreRestrictive() throws InterruptedException {
749 for (ObjectBusFactory obf: createObjectBus(null)) {
750 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
751
752 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
753 MoreRestrictiveHandler handler = new MoreRestrictiveHandler();
754 introspector.bind(handler, bus);
755
756 if (!obf.isUseSimpleMatcher()) {
757 bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\MoreRestrictiveHandler_"+obf.id+".dot")));
758 }
759
760 bus.post(Collections.singleton("Hello"));
761 bus.join();
762
763 assertTrue(handler.isCollectionInvoked());
764 assertFalse(handler.isListInvoked());
765 }
766 }
767
768 @Test
769 public void testLoop() throws InterruptedException {
770 DuplicatesFilter iFilter = new DuplicatesFilter();
771 for (ObjectBusFactory obf: createObjectBus(iFilter)) {
772 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
773
774 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
775 LoopHandler handler = new LoopHandler();
776 introspector.bind(handler, bus);
777
778 if (!obf.isUseSimpleMatcher()) {
779 bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\LoopHandler_"+obf.id+".dot")));
780 }
781
782 bus.post("Hello");
783 bus.post("Good bye!");
784 bus.join();
785
786 assertTrue(handler.isHelloOk());
787 assertEquals(1, handler.getHelloCounter());
788
789 assertTrue(handler.isWorldOk());
790 assertEquals(1, handler.getWorldCounter());
791 }
792 }
793
794 @Test
795 public void testInifinite() throws InterruptedException {
796 InferenceFilter iFilter = new InferenceChainLengthFilter(100);
797 for (ObjectBusFactory obf: createObjectBus(iFilter)) {
798 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
799
800 bus.setMaxDerivationDepth(100);
801
802 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
803 InfiniteHandler handler = new InfiniteHandler();
804 introspector.bind(handler, bus);
805
806 if (!obf.isUseSimpleMatcher()) {
807 bus.takeSnapshot(new LocalDispatchNetworkDotSnapshot<Object, Integer, Object>(new File("snapshots\\InfiniteHandler_"+obf.id+".dot")));
808 }
809
810 Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
811 handle.join();
812 }
813 }
814
815 @Test(expected=EventDispatchException.class)
816 public void testFaulty() throws InterruptedException {
817 for (ObjectBusFactory obf: createObjectBus(null)) {
818 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
819
820 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
821 FaultyHandler handler = new FaultyHandler();
822 introspector.bind(handler, bus);
823
824 Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
825 handle.join();
826 }
827 }
828
829 @Test
830 public void testRemoveHandler() throws InterruptedException {
831 for (ObjectBusFactory obf: createObjectBus(null)) {
832 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
833
834 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
835 SimpleStringHandler handler = new SimpleStringHandler();
836 Set<Long> keys = introspector.bind(handler, bus);
837
838 Handle<Object, Integer, Object, Long> handle = bus.post("Hello");
839 handle.join();
840 assertEquals(1, handler.getCounter());
841
842 bus.removeHandlers(keys);
843
844 SimpleStringHandler handler2 = new SimpleStringHandler();
845 introspector.bind(handler2, bus);
846
847 Handle<Object, Integer, Object, Long> handle2 = bus.post("Good bye!");
848 handle2.join();
849 assertEquals(1, handler.getCounter());
850 assertEquals(1, handler2.getCounter());
851 }
852 }
853
854 @Test
855 public void testReset() throws InterruptedException {
856 for (ObjectBusFactory obf: createObjectBus(null)) {
857 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
858
859 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
860 StatefulHandler handler = new StatefulHandler();
861 Set<Long> keys = introspector.bind(handler, bus);
862
863 bus.post("Hello");
864 bus.post("1");
865 bus.join();
866
867 assertEquals(2, handler.getState().size());
868 int kbSize = 0;
869 for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
870 ++kbSize;
871 }
872 assertEquals(2, kbSize);
873 assertEquals(0, handler.getResetCounter());
874
875 bus.reset();
876
877 assertEquals(0, handler.getState().size());
878 kbSize = 0;
879 for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
880 ++kbSize;
881 }
882 assertEquals(0, kbSize);
883 assertEquals(2, handler.getResetCounter());
884 }
885 }
886
887 private static int predicateCounter;
888
889 /**
890 * Helper interface for intantiate bus for tests.
891 * @author Pavel Vlasov
892 *
893 * @param <E>
894 * @param <P>
895 * @param <C>
896 */
897 private class ObjectBusFactory {
898
899 private InferencePolicy inferencePolicy;
900 private boolean pkStore;
901 private boolean useExecutor;
902 private InferenceFilter iFilter;
903 private int id;
904 private boolean useSimpleMatcher;
905 private ObservableConverter<Object> observableConverter;
906
907 public ObjectBusFactory(InferencePolicy inferencePolicy, InferenceFilter iFilter, boolean pkStore, boolean useExecutor, int id, boolean useSimpleMatcher) {
908 this.inferencePolicy = inferencePolicy;
909 this.pkStore = pkStore;
910 this.useExecutor = useExecutor;
911 this.iFilter = iFilter;
912 this.id = id;
913 this.useSimpleMatcher = useSimpleMatcher;
914 }
915
916 public boolean isUseExecutor() {
917 return useExecutor;
918 }
919
920 public boolean isUseSimpleMatcher() {
921 return useSimpleMatcher;
922 }
923
924 public InferencePolicy getInferencePolicy() {
925 return inferencePolicy;
926 }
927
928 @Override
929 public String toString() {
930 return "ObjectBusFactory [inferencePolicy=" + inferencePolicy
931 + ", pkStore=" + pkStore + ", useExecutor=" + useExecutor
932 + ", iFilter=" + iFilter + ", id=" + id + ", useSimpleMatcher="+useSimpleMatcher+"]";
933 }
934
935 public void setObservableConverter(ObservableConverter<Object> observableConverter) {
936 this.observableConverter = observableConverter;
937 }
938
939 LocalEventBus<Object, Integer, Object> createBus() {
940 LocalEventStoreImpl.Config<Object, Integer, Object> storeConfig = new LocalEventStoreImpl.Config<Object, Integer, Object>();
941 if (!pkStore) {
942 storeConfig.setPrimaryKeyExtractor(null);
943 }
944 if (useExecutor) {
945 storeConfig.setExecutorService(executorService);
946 }
947 LocalEventStore<Object, Integer, Object> eventStore = new LocalEventStoreImpl<Object, Integer, Object>(storeConfig);
948
949 LocalEventBus.Config<Object, Integer, Object> busConfig = new LocalEventBus.Config<Object, Integer, Object>();
950 busConfig.setEventType(Object.class);
951 busConfig.setStore(eventStore);
952 if (useExecutor) {
953 busConfig.setExecutorService(executorService);
954 }
955 busConfig.setInferencePolicy(inferencePolicy);
956 busConfig.setInferenceFilter(iFilter);
957 if (useSimpleMatcher) {
958 busConfig.setMatcher(new LocalSimpleMatcher<Object, Integer, Object, LocalEventStore<Object,Integer,Object>>());
959 }
960
961 busConfig.setObservableConverter(observableConverter);
962
963 return new LocalEventBus<Object, Integer, Object>(busConfig) {
964
965 // /**
966 // * For troubleshooting.
967 // */
968 // public Long addHandler(
969 // EventHandler<Object,Integer,Object,AbstractEventBus.Handle<Object,Integer,Object,Long>,
970 // LocalEventStore<Object,Integer,Object>> eventHandler,
971 // boolean oneOff,
972 // Predicate<Object,Object>... predicates) {
973 //
974 // String timestamp = Long.toString(System.currentTimeMillis(), Character.MAX_RADIX);
975 // for (int i=0; i<predicates.length; ++i) {
976 // PredicateOutput.output(predicates[i], new File("snapshots\\Predicate-"+(++i)+"-"+timestamp+"-"+i+".snapshot"));
977 // }
978 //
979 // return super.addHandler(eventHandler, oneOff, predicates);
980 // }
981
982 };
983 }
984
985 }
986 /**
987 * Helper interface for instantiate bus for tests.
988 * @author Pavel Vlasov
989 *
990 * @param <E>
991 * @param <P>
992 * @param <C>
993 */
994 private class StringBusFactory {
995
996 private InferencePolicy inferencePolicy;
997 private boolean pkStore;
998 private boolean useExecutor;
999 private InferenceFilter iFilter;
1000 private int id;
1001 private boolean useSimpleMatcher;
1002
1003 public StringBusFactory(InferencePolicy inferencePolicy, InferenceFilter iFilter, boolean pkStore, boolean useExecutor, int id, boolean useSimpleMatcher) {
1004 this.inferencePolicy = inferencePolicy;
1005 this.pkStore = pkStore;
1006 this.useExecutor = useExecutor;
1007 this.iFilter = iFilter;
1008 this.id = id;
1009 this.useSimpleMatcher = useSimpleMatcher;
1010 }
1011
1012 public boolean isUseExecutor() {
1013 return useExecutor;
1014 }
1015
1016 public boolean isUseSimpleMatcher() {
1017 return useSimpleMatcher;
1018 }
1019
1020 LocalEventBus<String, Integer, Object> createBus() {
1021 LocalEventStoreImpl.Config<String, Integer, Object> storeConfig = new LocalEventStoreImpl.Config<String, Integer, Object>();
1022 if (!pkStore) {
1023 storeConfig.setPrimaryKeyExtractor(null);
1024 }
1025 if (useExecutor) {
1026 storeConfig.setExecutorService(executorService);
1027 }
1028 LocalEventStore<String, Integer, Object> eventStore = new LocalEventStoreImpl<String, Integer, Object>(storeConfig);
1029
1030 LocalEventBus.Config<String, Integer, Object> busConfig = new LocalEventBus.Config<String, Integer, Object>();
1031 busConfig.setEventType(String.class);
1032 busConfig.setStore(eventStore);
1033 if (useExecutor) {
1034 busConfig.setExecutorService(executorService);
1035 }
1036 busConfig.setInferencePolicy(inferencePolicy);
1037 busConfig.setInferenceFilter(iFilter);
1038 if (useSimpleMatcher) {
1039 busConfig.setMatcher(new LocalSimpleMatcher<String, Integer, Object, LocalEventStore<String,Integer,Object>>());
1040 }
1041
1042 return new LocalEventBus<String, Integer, Object>(busConfig) {
1043
1044 // /**
1045 // * For troubleshooting.
1046 // */
1047 // public Long addHandler(
1048 // EventHandler<Object,Integer,Object,AbstractEventBus.Handle<Object,Integer,Object,Long>,
1049 // LocalEventStore<Object,Integer,Object>> eventHandler,
1050 // boolean oneOff,
1051 // Predicate<Object,Object>... predicates) {
1052 //
1053 // String timestamp = Long.toString(System.currentTimeMillis(), Character.MAX_RADIX);
1054 // for (int i=0; i<predicates.length; ++i) {
1055 // PredicateOutput.output(predicates[i], new File("snapshots\\Predicate-"+(++i)+"-"+timestamp+"-"+i+".snapshot"));
1056 // }
1057 //
1058 // return super.addHandler(eventHandler, oneOff, predicates);
1059 // }
1060
1061 };
1062 }
1063
1064 }
1065
1066 private Iterable<ObjectBusFactory> createObjectBus(InferenceFilter iFilter) {
1067 int counter = 0;
1068 ArrayList<ObjectBusFactory> ret = new ArrayList<ObjectBusFactory>();
1069 for (InferencePolicy ip: InferencePolicy.values()) {
1070 ret.add(new ObjectBusFactory(ip, iFilter, false, false, ++counter, true));
1071 ret.add(new ObjectBusFactory(ip, iFilter, false, true, ++counter, true));
1072 ret.add(new ObjectBusFactory(ip, iFilter, true, false, ++counter, true));
1073 ret.add(new ObjectBusFactory(ip, iFilter, true, true, ++counter, true));
1074
1075 ret.add(new ObjectBusFactory(ip, iFilter, false, false, ++counter, false));
1076 ret.add(new ObjectBusFactory(ip, iFilter, false, true, ++counter, false));
1077 ret.add(new ObjectBusFactory(ip, iFilter, true, false, ++counter, false));
1078 ret.add(new ObjectBusFactory(ip, iFilter, true, true, ++counter, false));
1079 }
1080 return ret;
1081 }
1082
1083 private Iterable<StringBusFactory> createStringBus(InferenceFilter iFilter) {
1084 int counter = 0;
1085 ArrayList<StringBusFactory> ret = new ArrayList<StringBusFactory>();
1086 for (InferencePolicy ip: InferencePolicy.values()) {
1087 ret.add(new StringBusFactory(ip, iFilter, false, false, ++counter, true));
1088 ret.add(new StringBusFactory(ip, iFilter, false, true, ++counter, true));
1089 ret.add(new StringBusFactory(ip, iFilter, true, false, ++counter, true));
1090 ret.add(new StringBusFactory(ip, iFilter, true, true, ++counter, true));
1091
1092 ret.add(new StringBusFactory(ip, iFilter, false, false, ++counter, false));
1093 ret.add(new StringBusFactory(ip, iFilter, false, true, ++counter, false));
1094 ret.add(new StringBusFactory(ip, iFilter, true, false, ++counter, false));
1095 ret.add(new StringBusFactory(ip, iFilter, true, true, ++counter, false));
1096 }
1097 return ret;
1098 }
1099
1100 @Test
1101 public void testFamilyTies() throws InterruptedException {
1102 int counter = 0;
1103 for (InferencePolicy ip: InferencePolicy.values()) {
1104 if (ip==InferencePolicy.IMMEDIATELY) {
1105 continue;
1106 }
1107
1108 _testFamilyTies(ip, false, false, ++counter, true);
1109 _testFamilyTies(ip, false, true, ++counter, true);
1110 _testFamilyTies(ip, true, false, ++counter, true);
1111 _testFamilyTies(ip, true, true, ++counter, true);
1112
1113 _testFamilyTies(ip, false, false, ++counter, false);
1114 _testFamilyTies(ip, false, true, ++counter, false);
1115 _testFamilyTies(ip, true, false, ++counter, false);
1116 _testFamilyTies(ip, true, true, ++counter, false);
1117 }
1118 }
1119
1120 private void _testFamilyTies(InferencePolicy iPolicy, boolean useExecutor, boolean pkStore, int id, boolean useSimpleMatcher) throws InterruptedException {
1121 // Create bus
1122 FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1123 if (useExecutor) {
1124 storeConfig.setExecutorService(executorService);
1125 }
1126 if (!pkStore) {
1127 storeConfig.setPrimaryKeyExtractor(null);
1128 }
1129 FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1130
1131 FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1132 busConfig.setStore(eventStore);
1133 if (useExecutor) {
1134 busConfig.setExecutorService(executorService);
1135 }
1136 if (useSimpleMatcher) {
1137 busConfig.setMatcher(new LocalSimpleMatcher<Relative, Integer, FamilyTiesRules, FamilyTiesEventStore>());
1138 }
1139
1140 busConfig.setInferencePolicy(iPolicy);
1141 busConfig.setInferenceFilter(new DuplicatesFilter());
1142
1143 FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1144
1145 FamilyTiesIntrospector introspector = new FamilyTiesIntrospector(null, null);
1146
1147 // Bind rules.
1148 introspector.bind(new DaughterRule(), bus);
1149 introspector.bind(new GrandRules(), bus);
1150 introspector.bind(new ParentChildRules(), bus);
1151 introspector.bind(new ParentRules(), bus);
1152 introspector.bind(new SecondaryRules(), bus);
1153 introspector.bind(new SiblingRules(), bus);
1154 introspector.bind(new SonRule(), bus);
1155 introspector.bind(new SpouseRules(), bus);
1156
1157 if (!useSimpleMatcher) {
1158 // Take dot snapshot
1159 bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\FamilyTies_"+id+".dot")));
1160 }
1161
1162 // Post seed relationships
1163 Person kate = new Person("Kate", 58, false);
1164 Person victor = new Person("Victor", 63, true);
1165 bus.post(new Spouse(kate, victor));
1166
1167 Person peter = new Person("Peter", 37, true);
1168 bus.post(new Child(peter, kate));
1169 bus.post(new Child(peter, victor));
1170
1171 Person alison = new Person("Alison", 36, false);
1172 bus.post(new Spouse(peter, alison));
1173
1174 Person lucy = new Person("Lucy", 17, false);
1175 bus.post(new Child(lucy, alison));
1176
1177 Person nancy = new Person("Nancy", 14, false);
1178 bus.post(new Child(nancy, peter));
1179
1180 Person dan = new Person("Dan", 7, true);
1181 bus.post(new Child(dan, peter));
1182 bus.post(new Child(dan, alison));
1183
1184 Person audrey = new Person("Audrey", 4, false);
1185 bus.post(new Child(audrey, peter));
1186 bus.post(new Child(audrey, alison));
1187
1188 Person tanya = new Person("Tanya", 31, false);
1189 Person max = new Person("Max", 32, true);
1190 bus.post(new Spouse(tanya, max));
1191 bus.post(new Child(tanya, kate));
1192 bus.post(new Child(tanya, victor));
1193
1194 Person vilma = new Person("Vilma", 14, false);
1195 bus.post(new Child(vilma, tanya));
1196
1197 Person george = new Person("George", 10, true);
1198 bus.post(new Child(george, tanya));
1199
1200 Person lisa = new Person("Lisa", 5, false);
1201 bus.post(new Child(lisa, tanya));
1202 bus.post(new Child(lisa, max));
1203
1204 bus.join();
1205
1206 // Take browser snapshot
1207 if (!useSimpleMatcher) {
1208 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\FamilyTies_"+id+".snapshot"));
1209 bus.takeSnapshot(emfSnapshot);
1210 }
1211
1212 int numberOfLisaRelatives = 0;
1213 for (Relative r: bus.getStore().getRelatives(lisa)) {
1214 ++numberOfLisaRelatives;
1215 }
1216 assertEquals(11, numberOfLisaRelatives);
1217
1218 for (Relative relative: bus.getStore().getRelatives(lisa)) {
1219 System.out.println(relative);
1220 }
1221 }
1222
1223 @Test
1224 public void testCompiledFamilyTies() throws InterruptedException {
1225 int counter = 0;
1226 for (InferencePolicy ip: InferencePolicy.values()) {
1227 if (ip==InferencePolicy.IMMEDIATELY) {
1228 continue;
1229 }
1230 _testCompiledFamilyTies(ip, false, false, ++counter, true);
1231 _testCompiledFamilyTies(ip, false, true, ++counter, true);
1232 _testCompiledFamilyTies(ip, true, false, ++counter, true);
1233 _testCompiledFamilyTies(ip, true, true, ++counter, true);
1234
1235 _testCompiledFamilyTies(ip, false, false, ++counter, false);
1236 _testCompiledFamilyTies(ip, false, true, ++counter, false);
1237 _testCompiledFamilyTies(ip, true, false, ++counter, false);
1238 _testCompiledFamilyTies(ip, true, true, ++counter, false);
1239 }
1240 }
1241
1242 private void _testCompiledFamilyTies(InferencePolicy iPolicy, boolean useExecutor, boolean pkStore, int id, boolean useSimpleMatcher) throws InterruptedException {
1243 // Create bus
1244 // Create bus
1245 FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1246 if (useExecutor) {
1247 storeConfig.setExecutorService(executorService);
1248 }
1249 if (!pkStore) {
1250 storeConfig.setPrimaryKeyExtractor(null);
1251 }
1252 FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1253
1254 FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1255 busConfig.setStore(eventStore);
1256 if (useExecutor) {
1257 busConfig.setExecutorService(executorService);
1258 }
1259 busConfig.setInferencePolicy(iPolicy);
1260 busConfig.setInferenceFilter(new DuplicatesFilter());
1261
1262 if (useSimpleMatcher) {
1263 busConfig.setMatcher(new LocalSimpleMatcher<Relative, Integer, FamilyTiesRules, FamilyTiesEventStore>());
1264 }
1265
1266 FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1267
1268 // Bind rules.
1269 new DaughterRuleJavaBinder().bind(new DaughterRule(), bus);
1270 new GrandRulesJavaBinder().bind(new GrandRules(), bus);
1271 new ParentChildRulesJavaBinder().bind(new ParentChildRules(), bus);
1272 new ParentRulesJavaBinder().bind(new ParentRules(), bus);
1273 new SecondaryRulesJavaBinder().bind(new SecondaryRules(), bus);
1274 new SiblingRulesJavaBinder().bind(new SiblingRules(), bus);
1275 new SonRuleJavaBinder().bind(new SonRule(), bus);
1276 new SpouseRulesJavaBinder().bind(new SpouseRules(), bus);
1277
1278 if (!useSimpleMatcher) {
1279 // Take dot snapshot
1280 bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\CompiledFamilyTies_"+id+".dot")));
1281 }
1282
1283 // Post seed relationships
1284 Person kate = new Person("Kate", 58, false);
1285 Person victor = new Person("Victor", 63, true);
1286 bus.post(new Spouse(kate, victor));
1287
1288 Person peter = new Person("Peter", 37, true);
1289 bus.post(new Child(peter, kate));
1290 bus.post(new Child(peter, victor));
1291
1292 Person alison = new Person("Alison", 36, false);
1293 bus.post(new Spouse(peter, alison));
1294
1295 Person lucy = new Person("Lucy", 17, false);
1296 bus.post(new Child(lucy, alison));
1297
1298 Person nancy = new Person("Nancy", 14, false);
1299 bus.post(new Child(nancy, peter));
1300
1301 Person dan = new Person("Dan", 7, true);
1302 bus.post(new Child(dan, peter));
1303 bus.post(new Child(dan, alison));
1304
1305 Person audrey = new Person("Audrey", 4, false);
1306 bus.post(new Child(audrey, peter));
1307 bus.post(new Child(audrey, alison));
1308
1309 Person tanya = new Person("Tanya", 31, false);
1310 Person max = new Person("Max", 32, true);
1311 bus.post(new Spouse(tanya, max));
1312 bus.post(new Child(tanya, kate));
1313 bus.post(new Child(tanya, victor));
1314
1315 Person vilma = new Person("Vilma", 14, false);
1316 bus.post(new Child(vilma, tanya));
1317
1318 Person george = new Person("George", 10, true);
1319 bus.post(new Child(george, tanya));
1320
1321 Person lisa = new Person("Lisa", 5, false);
1322 bus.post(new Child(lisa, tanya));
1323 bus.post(new Child(lisa, max));
1324
1325 bus.join();
1326
1327 if (!useSimpleMatcher) {
1328 // Take browser snapshot
1329 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\CompiledFamilyTies.snapshot"));
1330 bus.takeSnapshot(emfSnapshot);
1331 }
1332
1333 int numberOfLisaRelatives = 0;
1334 for (Relative r: bus.getStore().getRelatives(lisa)) {
1335 ++numberOfLisaRelatives;
1336 }
1337 assertEquals(11, numberOfLisaRelatives);
1338
1339 for (Relative relative: bus.getStore().getRelatives(lisa)) {
1340 System.out.println(relative);
1341 }
1342 }
1343
1344 @Test
1345 public void testFamilyTiesJmx() throws InterruptedException {
1346
1347 // Create bus
1348 FamilyTiesEventStoreImpl.Config storeConfig = new FamilyTiesEventStoreImpl.Config();
1349 storeConfig.setExecutorService(executorService);
1350 FamilyTiesEventStore eventStore = new FamilyTiesEventStoreImpl(storeConfig);
1351
1352 FamilyTiesEventBus.Config busConfig = new FamilyTiesEventBus.Config();
1353 busConfig.setStore(eventStore);
1354 busConfig.setExecutorService(executorService);
1355 busConfig.setInferenceFilter(new DuplicatesFilter());
1356
1357 JmxStatsCollector jsc = new JmxStatsCollector(ManagementFactory.getPlatformMBeanServer(), "Hammurapi Group:root=Family Ties");
1358 busConfig.setStatsCollector(jsc);
1359
1360 FamilyTiesEventBus bus = new FamilyTiesEventBus(busConfig);
1361
1362 Thread.sleep(40000);
1363
1364 FamilyTiesIntrospector introspector = new FamilyTiesIntrospector(null, null);
1365
1366 // Bind rules.
1367 introspector.bind(new DaughterRule(), bus);
1368 introspector.bind(new GrandRules(), bus);
1369 introspector.bind(new ParentChildRules(), bus);
1370 introspector.bind(new ParentRules(), bus);
1371 introspector.bind(new SecondaryRules(), bus);
1372 introspector.bind(new SiblingRules(), bus);
1373 introspector.bind(new SonRule(), bus);
1374 introspector.bind(new SpouseRules(), bus);
1375
1376 // Take dot snapshot
1377 bus.takeSnapshot(new FamilyTiesDispatchNetworkDotSnapshot(new File("snapshots\\FamilyTies_Jmx.dot")));
1378
1379 // Post seed relationships
1380 Person kate = new Person("Kate", 58, false);
1381 Person victor = new Person("Victor", 63, true);
1382 bus.post(new Spouse(kate, victor));
1383 Thread.sleep(2000);
1384
1385 Person peter = new Person("Peter", 37, true);
1386 bus.post(new Child(peter, kate));
1387 Thread.sleep(2000);
1388 bus.post(new Child(peter, victor));
1389 Thread.sleep(2000);
1390
1391 Person alison = new Person("Alison", 36, false);
1392 bus.post(new Spouse(peter, alison));
1393 Thread.sleep(2000);
1394
1395 Person lucy = new Person("Lucy", 17, false);
1396 bus.post(new Child(lucy, alison));
1397 Thread.sleep(2000);
1398
1399 Person nancy = new Person("Nancy", 14, false);
1400 bus.post(new Child(nancy, peter));
1401 Thread.sleep(2000);
1402
1403 Person dan = new Person("Dan", 7, true);
1404 bus.post(new Child(dan, peter));
1405 Thread.sleep(2000);
1406 bus.post(new Child(dan, alison));
1407 Thread.sleep(2000);
1408
1409 Person audrey = new Person("Audrey", 4, false);
1410 bus.post(new Child(audrey, peter));
1411 Thread.sleep(2000);
1412 bus.post(new Child(audrey, alison));
1413 Thread.sleep(2000);
1414
1415 Person tanya = new Person("Tanya", 31, false);
1416 Person max = new Person("Max", 32, true);
1417 bus.post(new Spouse(tanya, max));
1418 Thread.sleep(2000);
1419 bus.post(new Child(tanya, kate));
1420 Thread.sleep(2000);
1421 bus.post(new Child(tanya, victor));
1422 Thread.sleep(2000);
1423
1424 Person vilma = new Person("Vilma", 14, false);
1425 bus.post(new Child(vilma, tanya));
1426 Thread.sleep(2000);
1427
1428 Person george = new Person("George", 10, true);
1429 bus.post(new Child(george, tanya));
1430 Thread.sleep(2000);
1431
1432 Person lisa = new Person("Lisa", 5, false);
1433 bus.post(new Child(lisa, tanya));
1434 Thread.sleep(2000);
1435 bus.post(new Child(lisa, max));
1436 Thread.sleep(2000);
1437
1438 Thread.sleep(30000);
1439
1440 bus.join();
1441
1442 }
1443
1444 // --- Remove tests ---
1445
1446 /**
1447 * - Post event to the bus
1448 * - Check presence
1449 * - Remove event using bus.remove()
1450 * - Check that event is gone from the store.
1451 */
1452 @Test
1453 public void testRemoveWithBus() throws Exception {
1454 for (ObjectBusFactory obf: createObjectBus(null)) {
1455 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1456
1457 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1458
1459 PostingHandler ph = new PostingHandler();
1460 introspector.bind(ph, bus);
1461
1462 bus.post("World");
1463 bus.join();
1464
1465 assertEquals(1, ph.getWorldCounter());
1466 assertTrue(ph.isWorldOk());
1467
1468 assertEquals(1, ph.getEmCounter());
1469 assertTrue(ph.isEmOk());
1470
1471 Set<String> expected = new HashSet<String>();
1472 expected.add("Hello");
1473 expected.add("World");
1474 expected.add("!");
1475
1476 for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1477 if (!expected.remove(h.getEvent())) {
1478 fail("Unexpected object in the store: "+h.getEvent());
1479 }
1480 }
1481
1482 if (!expected.isEmpty()) {
1483 fail("Not present in the store: "+expected);
1484 }
1485
1486 bus.remove("World");
1487 bus.join();
1488
1489 for (Object o: bus.getStore()) {
1490 fail("Unexpected object in the store: "+o);
1491 }
1492 }
1493 }
1494
1495 /**
1496 * - Post event to the bus
1497 * - Check presence
1498 * - Remove event using Handle.remove()
1499 * - Check that event is gone from the store.
1500 */
1501 @Test
1502 public void testRemoveWithHandle() throws Exception {
1503 for (ObjectBusFactory obf: createObjectBus(null)) {
1504 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1505
1506 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1507
1508 PostingHandler ph = new PostingHandler();
1509 introspector.bind(ph, bus);
1510
1511 Handle<Object, Integer, Object, Long> wHandle = bus.post("World");
1512 bus.join();
1513
1514 assertEquals(1, ph.getWorldCounter());
1515 assertTrue(ph.isWorldOk());
1516
1517 assertEquals(1, ph.getEmCounter());
1518 assertTrue(ph.isEmOk());
1519
1520 Set<String> expected = new HashSet<String>();
1521 expected.add("Hello");
1522 expected.add("World");
1523 expected.add("!");
1524
1525 for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1526 if (!expected.remove(h.getEvent())) {
1527 fail("Unexpected object in the store: "+h.getEvent());
1528 }
1529 }
1530
1531 if (!expected.isEmpty()) {
1532 fail("Not present in the store: "+expected);
1533 }
1534
1535 wHandle.remove();
1536 bus.join();
1537
1538 for (Object o: bus.getStore()) {
1539 fail("Unexpected object in the store: "+o);
1540 }
1541 }
1542 }
1543
1544 /**
1545 * - Post event
1546 * - Validate that remove handler not fired
1547 * - Remove event
1548 * - Validate event handler fired
1549 * @throws Exception
1550 */
1551 @Test
1552 public void testRemoveHandlers() throws Exception {
1553 for (ObjectBusFactory obf: createObjectBus(null)) {
1554 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1555
1556 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1557
1558 PostingHandler ph = new PostingHandler();
1559 introspector.bind(ph, bus);
1560
1561 RemoveHandler rh = new RemoveHandler();
1562 introspector.bind(rh, bus);
1563 if (!obf.useSimpleMatcher) {
1564 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandlers_"+obf.id+".snapshot"));
1565 bus.takeSnapshot(emfSnapshot);
1566
1567 takeSnapshot(bus, "snapshots\\RemoveHandlers_"+obf.id+".dot");
1568 }
1569
1570 Handle<Object, Integer, Object, Long> wHandle = bus.post("World");
1571 bus.join();
1572
1573 assertEquals(1, ph.getWorldCounter());
1574 assertTrue(ph.isWorldOk());
1575
1576 assertEquals(1, ph.getEmCounter());
1577 assertTrue(ph.isEmOk());
1578
1579 assertEquals(0, rh.getWorldCounter());
1580 assertEquals(0, rh.getEmCounter());
1581
1582 Set<String> expected = new HashSet<String>();
1583 expected.add("Hello");
1584 expected.add("World");
1585 expected.add("!");
1586
1587 for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1588 if (!expected.remove(h.getEvent())) {
1589 fail("Unexpected object in the store: "+h.getEvent());
1590 }
1591 }
1592
1593 if (!expected.isEmpty()) {
1594 fail("Not present in the store: "+expected);
1595 }
1596
1597 wHandle.remove();
1598 bus.join();
1599
1600 int counter=0;
1601 for (Handle<Object, Integer, Object, Long> h: bus.getStore()) {
1602 ++counter;
1603 if (!"Bye!".equals(h.getEvent())) {
1604 fail("Unexpected object in the store: "+h.getEvent());
1605 }
1606 }
1607 assertEquals(1, ph.getWorldCounter());
1608 assertTrue(ph.isWorldOk());
1609
1610 assertEquals(1, ph.getEmCounter());
1611 assertTrue(ph.isEmOk());
1612
1613 assertEquals(1, rh.getWorldCounter());
1614 assertTrue(rh.isWorldOk());
1615
1616 assertEquals(1, rh.getEmCounter());
1617 assertTrue(rh.isEmOk());
1618
1619 assertEquals(String.valueOf(obf), 1, counter);
1620 }
1621 }
1622
1623 /**
1624 * - Post 1st event, validate not fired
1625 * - Post 2nd event, validate not fired
1626 * - Remove 2nd event, validate fired
1627 * - Remove 1st event, validate not fired
1628 * @throws Exception
1629 */
1630 @Test
1631 public void testFireJoinRemoveHandler() throws Exception {
1632 for (ObjectBusFactory obf: createObjectBus(null)) {
1633 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1634
1635 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1636
1637 JoinRemoveHandler jrh = new JoinRemoveHandler();
1638 introspector.bind(jrh, bus);
1639
1640 if (!obf.useSimpleMatcher) {
1641 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\JoinRemoveHandler_"+obf.id+".snapshot"));
1642 bus.takeSnapshot(emfSnapshot);
1643
1644 takeSnapshot(bus, "snapshots\\JoinRemoveHandler_"+obf.id+".dot");
1645 }
1646
1647 bus.post("World");
1648 bus.join();
1649
1650 assertEquals(0, jrh.getJoinCounter());
1651
1652 bus.post("Hello");
1653 bus.join();
1654
1655 assertEquals(0, jrh.getJoinCounter());
1656
1657 bus.remove("Hello");
1658 bus.join();
1659
1660 assertEquals(1, jrh.getJoinCounter());
1661 assertTrue(jrh.isJoinOk());
1662
1663 bus.remove("World");
1664 bus.join();
1665
1666 assertEquals(1, jrh.getJoinCounter());
1667 assertTrue(jrh.isJoinOk());
1668 }
1669 }
1670
1671 /**
1672 * - Post event which matcher remove handler 1
1673 * - Update event with value which matches remove handler 2
1674 * - Verify that remove handler 1 is fired
1675 * - Remove event
1676 * - Verify that remove handler 2 is fired
1677 * @throws Exception
1678 */
1679 @Test
1680 public void testUpdateFireRemoveHandlers() throws Exception {
1681 for (ObjectBusFactory obf: createObjectBus(null)) {
1682 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1683 if (obf.pkStore) {
1684 continue; // No PK stores - events are mutable.
1685 }
1686
1687 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1688
1689 RemoveHandler2 rh = new RemoveHandler2();
1690 introspector.bind(rh, bus);
1691
1692 HelperHandler hh = new HelperHandler();
1693 introspector.bind(hh, bus);
1694
1695 if (!obf.useSimpleMatcher) {
1696 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandler2_"+obf.id+".snapshot"));
1697 bus.takeSnapshot(emfSnapshot);
1698
1699 takeSnapshot(bus, "snapshots\\RemoveHandler2_"+obf.id+".dot");
1700 }
1701
1702 AtomicReference<String> aRef = new AtomicReference<String>("World");
1703
1704 rh.setExpectedWorld(aRef);
1705
1706 Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1707 bus.join();
1708
1709 assertEquals(0, rh.getWorldCounter());
1710 assertEquals(0, rh.getEmCounter());
1711
1712 assertEquals(1, hh.getWorldCounter());
1713 assertTrue(hh.isWorldOk());
1714
1715 assertEquals(0, hh.getEmCounter());
1716
1717 aRef.set("!");
1718 bus.join();
1719 assertEquals(0, rh.getWorldCounter());
1720 assertEquals(0, rh.getEmCounter());
1721
1722 assertEquals(1, hh.getWorldCounter());
1723 assertTrue(hh.isWorldOk());
1724
1725 assertEquals(0, hh.getEmCounter());
1726
1727 h.update();
1728 bus.join();
1729 assertEquals(1, rh.getWorldCounter());
1730 assertTrue(rh.isWorldOk());
1731 assertEquals(0, rh.getEmCounter());
1732
1733 assertEquals(1, hh.getWorldCounter());
1734 assertTrue(hh.isWorldOk());
1735
1736 assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1737 assertTrue(hh.isEmOk());
1738
1739 h.remove();
1740 bus.join();
1741 assertEquals(1, rh.getWorldCounter());
1742 assertTrue(rh.isWorldOk());
1743 assertEquals(1, rh.getEmCounter());
1744 assertTrue(rh.isEmOk());
1745
1746 assertEquals(1, hh.getWorldCounter());
1747 assertTrue(hh.isWorldOk());
1748
1749 assertEquals(1, hh.getEmCounter());
1750 assertTrue(hh.isEmOk());
1751
1752 }
1753 }
1754
1755 /**
1756 * Update event through dispatch context. Verify that remove
1757 * handlers for old value are fired and that post handlers for
1758 * new value are fired. Use join handler or AFTER_EVENT policy.
1759 * @throws Exception
1760 */
1761 @Test
1762 public void testUpdateFromHandler() throws Exception {
1763 for (ObjectBusFactory obf: createObjectBus(null)) {
1764 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1765 if (obf.pkStore) {
1766 continue; // No PK stores - events are mutable.
1767 }
1768
1769 if (obf.getInferencePolicy() == InferencePolicy.IMMEDIATELY) {
1770 continue;
1771 }
1772
1773 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1774
1775 RemoveHandler2 rh = new RemoveHandler2();
1776 introspector.bind(rh, bus);
1777
1778 HelperHandler2 hh = new HelperHandler2();
1779 introspector.bind(hh, bus);
1780
1781 if (!obf.useSimpleMatcher) {
1782 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\RemoveHandler2_"+obf.id+".snapshot"));
1783 bus.takeSnapshot(emfSnapshot);
1784
1785 takeSnapshot(bus, "snapshots\\RemoveHandler2_"+obf.id+".dot");
1786 }
1787
1788 AtomicReference<String> aRef = new AtomicReference<String>("World");
1789
1790 rh.setExpectedWorld(aRef);
1791
1792 Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1793 bus.join();
1794
1795 assertEquals(0, rh.getWorldCounter());
1796 assertEquals(0, rh.getEmCounter());
1797
1798 assertEquals(1, hh.getWorldCounter());
1799 assertTrue(hh.isWorldOk());
1800
1801 assertEquals(0, hh.getEmCounter());
1802
1803 bus.post("Hello"); // Hello fires join handler with World, which updates World to !
1804 bus.join();
1805 assertEquals(1, hh.getJoinCounter());
1806 assertTrue(hh.isJoinOk());
1807
1808 assertEquals(1, rh.getWorldCounter());
1809 assertTrue(rh.isWorldOk());
1810 assertEquals(0, rh.getEmCounter());
1811
1812 assertEquals(1, hh.getWorldCounter());
1813 assertTrue(hh.isWorldOk());
1814
1815 assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1816 assertTrue(hh.isEmOk());
1817
1818 h.remove();
1819 bus.join();
1820 assertEquals(1, rh.getWorldCounter());
1821 assertTrue(rh.isWorldOk());
1822 assertEquals(1, rh.getEmCounter());
1823 assertTrue(rh.isEmOk());
1824
1825 assertEquals(1, hh.getWorldCounter());
1826 assertTrue(hh.isWorldOk());
1827
1828 assertEquals(1, hh.getEmCounter());
1829 assertTrue(hh.isEmOk());
1830
1831 }
1832 }
1833
1834 public static class ObservableStringReference implements Observable<ObservableStringReference> {
1835
1836 public ObservableStringReference(String value) {
1837 super();
1838 this.value = value;
1839 }
1840
1841 private Collection<Observer<? super ObservableStringReference>> observers = new ArrayList<Observer<? super ObservableStringReference>>();
1842
1843 @Override
1844 public synchronized void addObserver(Observer<? super ObservableStringReference> o) {
1845 observers.add(o);
1846 }
1847
1848 @Override
1849 public synchronized void deleteObserver(Observer<? super ObservableStringReference> o) {
1850 observers.remove(o);
1851 }
1852
1853 public Collection<Observer<? super ObservableStringReference>> getObservers() {
1854 return observers;
1855 }
1856
1857 private volatile String value;
1858
1859 public synchronized void set(String value) {
1860 this.value = value;
1861 for (Observer<? super ObservableStringReference> o: observers) {
1862 o.update(this);
1863 }
1864 }
1865
1866 public String get() {
1867 return value;
1868 }
1869
1870
1871 };
1872
1873 /**
1874 * - Post event which matches handler 1, verify fired
1875 * - Update event to match handler 2, verify that handler 2 is fired
1876 * @throws Exception
1877 */
1878 @Test
1879 public void testUpdateThroughObservable() throws Exception {
1880 ObservableConverter<Object> observableConverter = new ObservableConverter<Object>() {
1881
1882 @Override
1883 public Observable<Object> convert(Object obj) {
1884 return obj instanceof Observable ? (Observable<Object>) obj : null;
1885 }
1886
1887 };
1888 for (ObjectBusFactory obf: createObjectBus(null)) {
1889 obf.setObservableConverter(observableConverter);
1890 LocalEventBus<Object, Integer, Object> bus = obf.createBus();
1891 if (obf.pkStore) {
1892 continue; // No PK stores - events are mutable.
1893 }
1894
1895 LocalIntrospector<Object, Object> introspector = new LocalIntrospector<Object, Object>(null, null);
1896
1897 RemoveHandler2o rh = new RemoveHandler2o();
1898 introspector.bind(rh, bus);
1899
1900 HelperHandler_o hh = new HelperHandler_o();
1901 introspector.bind(hh, bus);
1902
1903 if (!obf.useSimpleMatcher) {
1904 SnapshotOutput emfSnapshot = new SnapshotOutput(new File("snapshots\\UpdateThroughObservable_"+obf.id+".snapshot"));
1905 bus.takeSnapshot(emfSnapshot);
1906
1907 takeSnapshot(bus, "snapshots\\UpdateThroughObservable_"+obf.id+".dot");
1908 }
1909
1910 ObservableStringReference aRef = new ObservableStringReference("World");
1911
1912 rh.setExpectedWorld(aRef);
1913
1914 assertTrue(aRef.getObservers().isEmpty());
1915
1916 Handle<Object, Integer, Object, Long> h = bus.post(aRef);
1917 bus.join();
1918
1919 assertEquals(1, aRef.getObservers().size());
1920
1921 assertEquals(0, rh.getWorldCounter());
1922 assertEquals(0, rh.getEmCounter());
1923
1924 assertEquals(1, hh.getWorldCounter());
1925 assertTrue(hh.isWorldOk());
1926
1927 assertEquals(0, hh.getEmCounter());
1928
1929 aRef.set("!");
1930 bus.join();
1931 assertEquals(1, rh.getWorldCounter());
1932 assertTrue(rh.isWorldOk());
1933 assertEquals(0, rh.getEmCounter());
1934
1935 assertEquals(1, hh.getWorldCounter());
1936 assertTrue(hh.isWorldOk());
1937
1938 assertEquals(String.valueOf(obf), 1, hh.getEmCounter());
1939 assertTrue(hh.isEmOk());
1940
1941 h.remove();
1942 bus.join();
1943 assertEquals(1, rh.getWorldCounter());
1944 assertTrue(rh.isWorldOk());
1945 assertEquals(1, rh.getEmCounter());
1946 assertTrue(rh.isEmOk());
1947
1948 assertEquals(1, hh.getWorldCounter());
1949 assertTrue(hh.isWorldOk());
1950
1951 assertEquals(1, hh.getEmCounter());
1952 assertTrue(hh.isEmOk());
1953
1954 assertTrue(aRef.getObservers().isEmpty());
1955
1956 }
1957 }
1958
1959 }