1 | package com.hammurapi.eventbus.local; |
2 | |
3 | import java.lang.reflect.Array; |
4 | import java.util.ArrayList; |
5 | import java.util.Arrays; |
6 | import java.util.Collection; |
7 | import java.util.Collections; |
8 | import java.util.Comparator; |
9 | import java.util.Iterator; |
10 | import java.util.List; |
11 | import java.util.Set; |
12 | import java.util.concurrent.ExecutorService; |
13 | import java.util.concurrent.atomic.AtomicBoolean; |
14 | import java.util.concurrent.atomic.AtomicLong; |
15 | import java.util.concurrent.locks.Lock; |
16 | import java.util.concurrent.locks.ReadWriteLock; |
17 | import java.util.concurrent.locks.ReentrantLock; |
18 | import java.util.concurrent.locks.ReentrantReadWriteLock; |
19 | import java.util.logging.Level; |
20 | import java.util.logging.Logger; |
21 | |
22 | import com.hammurapi.common.Joiner; |
23 | import com.hammurapi.common.Joiner.Collector; |
24 | import com.hammurapi.eventbus.AbstractEventBus; |
25 | import com.hammurapi.eventbus.AbstractEventBus.Handle; |
26 | import com.hammurapi.eventbus.AbstractEventBus.Snapshot; |
27 | import com.hammurapi.eventbus.CompositeContext; |
28 | import com.hammurapi.eventbus.DispatchNetworkException; |
29 | import com.hammurapi.eventbus.EventBus; |
30 | import com.hammurapi.eventbus.EventDispatchContext; |
31 | import com.hammurapi.eventbus.EventDispatchException; |
32 | import com.hammurapi.eventbus.EventDispatchJoinContext; |
33 | import com.hammurapi.eventbus.EventHandler; |
34 | import com.hammurapi.eventbus.EventHandlerBase.Mode; |
35 | import com.hammurapi.eventbus.EventHandlerWrapper; |
36 | import com.hammurapi.eventbus.EventStore; |
37 | import com.hammurapi.eventbus.InferenceCommand; |
38 | import com.hammurapi.eventbus.InferenceContext; |
39 | import com.hammurapi.eventbus.InferencePolicy; |
40 | import com.hammurapi.eventbus.PredicateChainingMatcher; |
41 | import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle; |
42 | import com.hammurapi.extract.Predicate; |
43 | import com.hammurapi.extract.True; |
44 | |
45 | /** |
46 | * This matcher does matching in a straightforward way - it evaluates handler predicates sequentially, it does not cache extracted values, does not |
47 | * optimize event joins, and does not use executor service. The purpose of this matcher is to help with debugging and troubleshooting of handler/predicate |
48 | * logic and to be a baseline of matching performance. |
49 | * |
50 | * This matcher doesn't support snapshots. |
51 | * |
52 | * @author Pavel Vlasov |
53 | * |
54 | * @param <E> |
55 | * @param <P> |
56 | * @param <C> |
57 | * @param <K> |
58 | * @param <H> |
59 | * @param <S> |
60 | */ |
61 | public class LocalSimpleMatcher<E, P extends Comparable<P>, C, S extends EventStore<E,P,C,AbstractEventBus.Handle<E, P, C, Long>,S>> implements LocalMatcher<E,P,C,S> { |
62 | private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName()); |
63 | private EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> eventBus; |
64 | |
65 | protected Collection<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>> wrappers = new ArrayList<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>>(); |
66 | private ReadWriteLock lock = new ReentrantReadWriteLock(); |
67 | |
68 | /** |
69 | * Lock for the inference network. |
70 | * @return |
71 | */ |
72 | protected ReadWriteLock getLock() { |
73 | return lock; |
74 | } |
75 | |
76 | private AtomicLong handlerCounter = new AtomicLong(-1); |
77 | |
78 | protected Long nextId() { |
79 | LocalEventBusBase<E, P, C, S> localEventBusBase = (LocalEventBusBase<E,P,C,S>) eventBus; |
80 | return localEventBusBase==null ? handlerCounter.decrementAndGet() : localEventBusBase.nextId(); |
81 | } |
82 | |
83 | @Override |
84 | public Long addHandler(final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler) { |
85 | if (eventHandler.getCardinality()<1) { |
86 | throw new DispatchNetworkException("Handler cardinality cannot be less than one."); |
87 | } |
88 | getLock().writeLock().lock(); |
89 | try { |
90 | final Long registrationKey = nextId(); |
91 | final Set<Long> keySet = Collections.unmodifiableSet(Collections.singleton(registrationKey)); |
92 | if (eventHandler.getCardinality()==1) { |
93 | EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w = new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() { |
94 | |
95 | @Override |
96 | public boolean consumes() { |
97 | return eventHandler.consumes(); |
98 | } |
99 | |
100 | @Override |
101 | public P getPriority() { |
102 | return eventHandler.getPriority(); |
103 | } |
104 | |
105 | @Override |
106 | public int getCardinality() { |
107 | return eventHandler.getCardinality(); |
108 | } |
109 | |
110 | @Override |
111 | public void reset() { |
112 | eventHandler.reset(); |
113 | } |
114 | |
115 | @Override |
116 | public C getContext() { |
117 | return eventHandler.getContext(); |
118 | } |
119 | |
120 | @Override |
121 | public boolean isOneOff() { |
122 | return eventHandler.isOneOff(); |
123 | } |
124 | |
125 | @Override |
126 | public Mode getMode() { |
127 | return eventHandler.getMode(); |
128 | } |
129 | |
130 | private AtomicBoolean fired=new AtomicBoolean(false); |
131 | |
132 | @SuppressWarnings("unchecked") |
133 | @Override |
134 | public void post( |
135 | EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, |
136 | InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext, |
137 | Handle<E,P,C,Long>... handles) { |
138 | if (!isOneOff() || !fired.getAndSet(true)) { |
139 | E[] events = (E[]) Array.newInstance(eventBus.getEventType(), handles.length); |
140 | for (int i=0; i<handles.length; ++i) { |
141 | if (handles[i].isValid()) { |
142 | events[i] = handles[i].getEvent(); |
143 | } else { |
144 | return; |
145 | } |
146 | } |
147 | |
148 | if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) { |
149 | |
150 | // if (inferenceContext.getInferenceCommandsQueue()!=null) { |
151 | // throw new IllegalArgumentException("Inference commands queue shall be null"); |
152 | // } |
153 | |
154 | inferenceContext = inferenceContext.wrap(); |
155 | } |
156 | |
157 | LocalEventDispatchContextImpl<E,P,C,S> dispatchContext; |
158 | |
159 | if (eventHandler.getMode()==Mode.POST || eventHandler.getMode()==Mode.BOTH) { |
160 | if (handles.length==1) { |
161 | dispatchContext = new LocalEventDispatchContextImpl<E, P, C, S>( |
162 | inferenceContext, |
163 | eventHandler, |
164 | registrationKey, |
165 | handles, |
166 | events, |
167 | Mode.POST); |
168 | } else { |
169 | dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>( |
170 | inferenceContext, |
171 | eventHandler, |
172 | registrationKey, |
173 | handles, |
174 | events, |
175 | (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context, |
176 | Mode.POST); |
177 | } |
178 | |
179 | LocalEventDispatchContextImpl.threadContext.set(dispatchContext); |
180 | try { |
181 | if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) { |
182 | if (!getPredicate().extract(getContext(), null, events)) { |
183 | throw new EventDispatchException("Handler predicate evaluated to false before post: "+getPredicate()+", handler "+eventHandler); |
184 | } |
185 | } |
186 | eventHandler.post(dispatchContext, events); |
187 | |
188 | // If no exception - post events |
189 | if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) { |
190 | inferenceContext.processInferenceCommands(); |
191 | } |
192 | } catch (Exception e) { |
193 | logger.log(Level.SEVERE, "Exception in event handler: "+e, e); |
194 | if (eventBus.getExceptionHandler()!=null) { |
195 | eventBus.getExceptionHandler().handleException(e); |
196 | } |
197 | if (inferenceContext.getRootHandle()!=null) { |
198 | inferenceContext.getRootHandle().handleException(e); |
199 | } |
200 | } finally { |
201 | LocalEventDispatchContextImpl.threadContext.set(null); |
202 | } |
203 | if (isOneOff()) { |
204 | ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey); |
205 | } |
206 | } |
207 | |
208 | if (eventHandler.getMode()==Mode.REMOVE || eventHandler.getMode()==Mode.BOTH) { |
209 | RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(eventHandler, registrationKey, handles, eventBus.getEventType()); |
210 | for (Handle<E,P,C,Long> handle: handles) { |
211 | ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener); |
212 | } |
213 | } |
214 | } |
215 | } |
216 | |
217 | @Override |
218 | public Set<Long> getRegistrationKeys() { |
219 | return keySet; |
220 | } |
221 | |
222 | @Override |
223 | public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() { |
224 | return eventHandler; |
225 | } |
226 | |
227 | @Override |
228 | public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) { |
229 | // NOP |
230 | } |
231 | |
232 | @Override |
233 | public Predicate<E, C> getPredicate() { |
234 | return eventHandler.getPredicate(); |
235 | } |
236 | |
237 | }; |
238 | wrappers.add(w); |
239 | } else { |
240 | HandleJoiner joiner = createJoiner(eventHandler, registrationKey); |
241 | for (int i=0; i<eventHandler.getCardinality(); ++i) { |
242 | wrappers.add(createJoinHandler(eventHandler, registrationKey, keySet, joiner, i)); |
243 | } |
244 | } |
245 | return registrationKey; |
246 | } finally { |
247 | getLock().writeLock().unlock(); |
248 | } |
249 | } |
250 | |
251 | private EventHandlerWrapper<E, P, C, Long, Handle<E, P, C, Long>, S> createJoinHandler( |
252 | final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler, |
253 | final Long registrationKey, |
254 | final Set<Long> keySet, |
255 | final HandleJoiner joiner, |
256 | final int idx) { |
257 | |
258 | return new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() { |
259 | |
260 | @Override |
261 | public boolean consumes() { |
262 | return eventHandler.consumes(); |
263 | } |
264 | |
265 | @Override |
266 | public P getPriority() { |
267 | return eventHandler.getPriority(); |
268 | } |
269 | |
270 | @Override |
271 | public int getCardinality() { |
272 | return eventHandler.getCardinality(); |
273 | } |
274 | |
275 | @Override |
276 | public void reset() { |
277 | eventHandler.reset(); |
278 | } |
279 | |
280 | @Override |
281 | public C getContext() { |
282 | return eventHandler.getContext(); |
283 | } |
284 | |
285 | @Override |
286 | public boolean isOneOff() { |
287 | return eventHandler.isOneOff(); |
288 | } |
289 | |
290 | @Override |
291 | public Mode getMode() { |
292 | return eventHandler.getMode(); |
293 | } |
294 | |
295 | @Override |
296 | public void post( |
297 | EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, |
298 | InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext, |
299 | Handle<E,P,C,Long>... handles) { |
300 | |
301 | if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) { |
302 | |
303 | // if (inferenceContext.getInferenceCommandsQueue()!=null) { |
304 | // throw new IllegalArgumentException("Inference commands queue shall be null"); |
305 | // } |
306 | |
307 | inferenceContext = inferenceContext.wrap(); |
308 | } |
309 | |
310 | CompositeContext<E, P, C, Long, Handle<E, P, C, Long>, S> compositeContext = new CompositeContext<E, P, C, Long,AbstractEventBus.Handle<E,P,C,Long>, S>( |
311 | context, |
312 | eventHandler.getContext(), |
313 | inferenceContext); |
314 | |
315 | try { |
316 | joiner.addInput(idx, handles[0], compositeContext); |
317 | } catch (Exception e) { |
318 | logger.log(Level.SEVERE, "Join problem: "+e, e); |
319 | if (eventBus.getExceptionHandler()!=null) { |
320 | eventBus.getExceptionHandler().handleException(e); |
321 | } |
322 | if (inferenceContext.getRootHandle()!=null) { |
323 | inferenceContext.getRootHandle().handleException(e); |
324 | } |
325 | } finally { |
326 | if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) { |
327 | inferenceContext.processInferenceCommands(); |
328 | } |
329 | } |
330 | |
331 | } |
332 | |
333 | @Override |
334 | public Set<Long> getRegistrationKeys() { |
335 | return keySet; |
336 | } |
337 | |
338 | @Override |
339 | public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() { |
340 | return eventHandler; |
341 | } |
342 | |
343 | @Override |
344 | public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) { |
345 | // NOP |
346 | } |
347 | |
348 | @Override |
349 | public Predicate<E, C> getPredicate() { |
350 | /** |
351 | * Always returns true because predicate is evaluated in join(). |
352 | */ |
353 | return True.getInstance(); |
354 | } |
355 | |
356 | }; |
357 | } |
358 | |
359 | @Override |
360 | public Iterable<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> match(E event, ExecutorService executorService) { |
361 | @SuppressWarnings("unchecked") |
362 | E[] events = (E[]) Array.newInstance(eventBus.getEventType(), 1); |
363 | events[0] = event; |
364 | |
365 | List<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> ret = new ArrayList<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>(); |
366 | getLock().readLock().lock(); |
367 | try { |
368 | for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) { |
369 | if (w.getPredicate().extract(w.getContext(), null, events)) { |
370 | ret.add(w); |
371 | } |
372 | } |
373 | } finally { |
374 | getLock().readLock().unlock(); |
375 | } |
376 | |
377 | Collections.sort(ret, new Comparator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>() { |
378 | |
379 | @Override |
380 | public int compare(EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o1, EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o2) { |
381 | P p1 = o1.getPriority(); |
382 | P p2 = o2.getPriority(); |
383 | if (p2==null) { |
384 | return p1==null ? o2.hashCode() - o1.hashCode() : 1; |
385 | } |
386 | if (p1==null) { |
387 | return -1; |
388 | } |
389 | return p2.compareTo(p1); |
390 | } |
391 | |
392 | }); |
393 | |
394 | return ret; |
395 | } |
396 | |
397 | @Override |
398 | public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot) { |
399 | // TODO - Implement. |
400 | throw new UnsupportedOperationException("LocalSimpleMatcher does not support takeSnapshot() operation"); |
401 | } |
402 | |
403 | @Override |
404 | public void removeHandlers(Iterable<Long> keys) { |
405 | getLock().writeLock().lock(); |
406 | try { |
407 | for (Long key: keys) { |
408 | Iterator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> it = wrappers.iterator(); |
409 | Set<Long> rKeys = it.next().getRegistrationKeys(); |
410 | for (Long rKey: rKeys) { |
411 | if (rKey.equals(key)) { |
412 | it.remove(); |
413 | break; |
414 | } |
415 | } |
416 | } |
417 | } finally { |
418 | getLock().writeLock().unlock(); |
419 | } |
420 | } |
421 | |
422 | @Override |
423 | public void reset() { |
424 | getLock().readLock().lock(); |
425 | try { |
426 | for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) { |
427 | w.reset(); |
428 | } |
429 | } finally { |
430 | getLock().readLock().unlock(); |
431 | } |
432 | } |
433 | |
434 | @Override |
435 | public void manageHandlers(HandlerManager<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handlerManager) { |
436 | getLock().writeLock().lock(); |
437 | try { |
438 | handlerManager.manageHandlers(this); |
439 | } finally { |
440 | getLock().writeLock().unlock(); |
441 | } |
442 | } |
443 | |
444 | @Override |
445 | public void setEventBus(EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> bus) { |
446 | this.eventBus = bus; |
447 | } |
448 | |
449 | private class HandleJoiner extends Joiner<Handle<E,P,C,Long>, CompositeContext<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>, Object> { |
450 | |
451 | Lock lock = new ReentrantLock(); |
452 | |
453 | private EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler; |
454 | private Collector<Handle<E,P,C,Long>>[] ic; |
455 | |
456 | private Long registrationKey; |
457 | |
458 | public HandleJoiner( |
459 | Collector<Handle<E,P,C,Long>>[] inputCollectors, |
460 | Class<Handle<E,P,C,Long>> inputType, |
461 | EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler, |
462 | Long registrationKey) { |
463 | |
464 | super(inputCollectors, inputType, false); |
465 | this.ic = inputCollectors; |
466 | this.handler = handler; |
467 | this.registrationKey = registrationKey; |
468 | } |
469 | |
470 | @Override |
471 | protected void startJoin() { |
472 | // System.out.println("Lock by "+Thread.currentThread()); |
473 | lock.lock(); |
474 | } |
475 | |
476 | @Override |
477 | protected void endJoin() { |
478 | // System.out.println("UnLock by "+Thread.currentThread()); |
479 | lock.unlock(); |
480 | } |
481 | |
482 | private AtomicBoolean fired=new AtomicBoolean(false); |
483 | |
484 | @Override |
485 | protected Object join( |
486 | final Handle<E,P,C,Long>[] inputs, |
487 | final CompositeContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> context, |
488 | final Joiner.InputConsumer consumer, |
489 | int activator) throws Exception { |
490 | |
491 | if (!handler.isOneOff() || !fired.getAndSet(true)) { |
492 | final E[] events = (E[]) Array.newInstance(eventBus.getEventType(), inputs.length); |
493 | for (int i=0; i<inputs.length; ++i) { |
494 | if (inputs[i].isValid()) { |
495 | events[i] = inputs[i].getEvent(); |
496 | } else { |
497 | return null; |
498 | } |
499 | } |
500 | |
501 | if (!handler.getPredicate().extract(handler.getContext(), null, events)) { |
502 | return null; |
503 | } |
504 | |
505 | if (handler.getMode()==Mode.POST || handler.getMode()==Mode.BOTH) { |
506 | LocalEventDispatchContextImpl<E,P,C,S> dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>( |
507 | context.getInferenceContext(), |
508 | handler, |
509 | registrationKey, |
510 | inputs, |
511 | events, |
512 | (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context.getEventDispatchContext(), |
513 | Mode.POST) { |
514 | |
515 | @Override |
516 | public void consumeJoin(int index) { |
517 | consumer.consume(index); |
518 | } |
519 | |
520 | @Override |
521 | public void consumeJoin(E event) { |
522 | for (int i=0; i<events.length; ++i) { |
523 | if (event==events[i]) { |
524 | consumeJoin(i); |
525 | break; |
526 | } |
527 | } |
528 | } |
529 | }; |
530 | |
531 | LocalEventDispatchContextImpl.threadContext.set(dispatchContext); |
532 | try { |
533 | if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) { |
534 | if (!handler.getPredicate().extract(handler.getContext(), null, events)) { |
535 | throw new EventDispatchException("Handler predicate evaluated to false before post: "+handler.getPredicate()+", handler "+handler); |
536 | } |
537 | } |
538 | handler.post(dispatchContext, events); |
539 | } catch (Exception e) { |
540 | logger.log(Level.SEVERE, "Exception in event handler: "+e, e); |
541 | if (eventBus.getExceptionHandler()!=null) { |
542 | eventBus.getExceptionHandler().handleException(e); |
543 | } |
544 | if (context.getInferenceContext().getRootHandle()!=null) { |
545 | context.getInferenceContext().getRootHandle().handleException(e); |
546 | } |
547 | } finally { |
548 | LocalEventDispatchContextImpl.threadContext.set(null); |
549 | } |
550 | if (handler.isOneOff()) { |
551 | ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey); |
552 | |
553 | } |
554 | } |
555 | |
556 | if (handler.getMode()==Mode.REMOVE || handler.getMode()==Mode.BOTH) { |
557 | RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(handler, registrationKey, inputs, eventBus.getEventType()); |
558 | for (Handle<E,P,C,Long> handle: inputs) { |
559 | ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener); |
560 | } |
561 | } |
562 | |
563 | } |
564 | return null; |
565 | } |
566 | |
567 | @Override |
568 | public String toString() { |
569 | return "Simple HandleJoiner [handler=" + handler + ", ic=" + Arrays.toString(ic) + "]"; |
570 | }; |
571 | |
572 | } |
573 | |
574 | @SuppressWarnings("unchecked") |
575 | protected HandleJoiner createJoiner(EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler, Long registrationKey) { |
576 | |
577 | Collector<Handle<E,P,C,Long>>[] inputCollectors = new Collector[eventHandler.getCardinality()]; |
578 | for (int i=0; i<inputCollectors.length; ++i) { |
579 | inputCollectors[i] = new Joiner.CollectionAdapter<Handle<E,P,C,Long>>(new ArrayList<Handle<E,P,C,Long>>() /* createCollection()*/) { |
580 | |
581 | /** |
582 | * Override to replace handles with facades of appropriate type. |
583 | */ |
584 | public boolean add(AbstractEventBus.Handle<E,P,C,Long> handle) { |
585 | if (handle instanceof MasterHandle) { |
586 | return super.add(new FacadeHandle((MasterHandle) handle, ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength())); |
587 | } else if (handle instanceof FacadeHandle) { |
588 | return super.add(new FacadeHandle(((FacadeHandle) handle).getMaster(), ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength())); |
589 | } else { |
590 | throw new IllegalArgumentException("Unexpected handle type: "+handle.getClass()); |
591 | } |
592 | }; |
593 | |
594 | public boolean remove(AbstractEventBus.Handle<E,P,C,Long> obj) { |
595 | // System.out.println("Before remove: "+this); |
596 | Iterator<Handle<E, P, C, Long>> it = iterator(); |
597 | while (it.hasNext()) { |
598 | if (obj.getId().equals(it.next().getId())) { |
599 | it.remove(); |
600 | return true; |
601 | } |
602 | } |
603 | |
604 | return false; |
605 | }; |
606 | }; |
607 | } |
608 | Class inputClass = Handle.class; |
609 | return new HandleJoiner(inputCollectors, inputClass, eventHandler, registrationKey); |
610 | } |
611 | |
612 | // private Collection<Handle<E, P, C, Long>> createCollection() { |
613 | // return InvocationRecordingProxyFactory.wrap(Collection.class, new ArrayList<Handle<E,P,C,Long>>(), new ProblemListener() { |
614 | // |
615 | // @Override |
616 | // public void onProblem(Throwable th, List<InvocationRecord> records) { |
617 | // Object[] ra = records.toArray(); |
618 | // for (int i=0; i<ra.length; ++i) { |
619 | // System.out.println(i+" "+ra[i]); |
620 | // } |
621 | // |
622 | // th.printStackTrace(); |
623 | // } |
624 | // |
625 | // }); |
626 | // } |
627 | |
628 | |
629 | } |