001    package com.hammurapi.eventbus.local;
002    
003    import java.lang.reflect.Array;
004    import java.util.ArrayList;
005    import java.util.Arrays;
006    import java.util.Collection;
007    import java.util.Collections;
008    import java.util.Comparator;
009    import java.util.Iterator;
010    import java.util.List;
011    import java.util.Set;
012    import java.util.concurrent.ExecutorService;
013    import java.util.concurrent.atomic.AtomicBoolean;
014    import java.util.concurrent.atomic.AtomicLong;
015    import java.util.concurrent.locks.Lock;
016    import java.util.concurrent.locks.ReadWriteLock;
017    import java.util.concurrent.locks.ReentrantLock;
018    import java.util.concurrent.locks.ReentrantReadWriteLock;
019    import java.util.logging.Level;
020    import java.util.logging.Logger;
021    
022    import com.hammurapi.common.Joiner;
023    import com.hammurapi.common.Joiner.Collector;
024    import com.hammurapi.eventbus.AbstractEventBus;
025    import com.hammurapi.eventbus.AbstractEventBus.Handle;
026    import com.hammurapi.eventbus.AbstractEventBus.Snapshot;
027    import com.hammurapi.eventbus.CompositeContext;
028    import com.hammurapi.eventbus.DispatchNetworkException;
029    import com.hammurapi.eventbus.EventBus;
030    import com.hammurapi.eventbus.EventDispatchContext;
031    import com.hammurapi.eventbus.EventDispatchException;
032    import com.hammurapi.eventbus.EventDispatchJoinContext;
033    import com.hammurapi.eventbus.EventHandler;
034    import com.hammurapi.eventbus.EventHandlerBase.Mode;
035    import com.hammurapi.eventbus.EventHandlerWrapper;
036    import com.hammurapi.eventbus.EventStore;
037    import com.hammurapi.eventbus.InferenceCommand;
038    import com.hammurapi.eventbus.InferenceContext;
039    import com.hammurapi.eventbus.InferencePolicy;
040    import com.hammurapi.eventbus.PredicateChainingMatcher;
041    import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle;
042    import com.hammurapi.extract.Predicate;
043    import com.hammurapi.extract.True;
044    
045    /**
046     * This matcher does matching in a straightforward way - it evaluates handler predicates sequentially, it does not cache extracted values, does not 
047     * optimize event joins, and does not use executor service. The purpose of this matcher is to help with debugging and troubleshooting of handler/predicate 
048     * logic and to be a baseline of matching performance.
049     * 
050     * This matcher doesn't support snapshots.
051     * 
052     * @author Pavel Vlasov
053     *
054     * @param <E>
055     * @param <P>
056     * @param <C>
057     * @param <K>
058     * @param <H>
059     * @param <S>
060     */
061    public class LocalSimpleMatcher<E, P extends Comparable<P>, C, S extends EventStore<E,P,C,AbstractEventBus.Handle<E, P, C, Long>,S>> implements LocalMatcher<E,P,C,S> {
062            private static final Logger logger = Logger.getLogger(PredicateChainingMatcher.class.getName());
063            private EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> eventBus;
064    
065            protected Collection<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>> wrappers = new ArrayList<EventHandlerWrapper<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>,S>>();
066            private ReadWriteLock lock = new ReentrantReadWriteLock(); 
067    
068            /**
069             * Lock for the inference network.
070             * @return
071             */
072            protected ReadWriteLock getLock() {
073                    return lock;
074            }
075    
076            private AtomicLong handlerCounter = new AtomicLong(-1);
077    
078            protected Long nextId() {
079                    LocalEventBusBase<E, P, C, S> localEventBusBase = (LocalEventBusBase<E,P,C,S>) eventBus;
080                    return localEventBusBase==null ? handlerCounter.decrementAndGet() : localEventBusBase.nextId();
081            }
082            
083            @Override
084            public Long addHandler(final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler) {
085                    if (eventHandler.getCardinality()<1) {
086                            throw new DispatchNetworkException("Handler cardinality cannot be less than one.");
087                    }
088                    getLock().writeLock().lock();
089                    try {
090                            final Long registrationKey = nextId();
091                            final Set<Long> keySet = Collections.unmodifiableSet(Collections.singleton(registrationKey));             
092                            if (eventHandler.getCardinality()==1) {
093                                    EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w = new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() {
094    
095                                            @Override
096                                            public boolean consumes() {
097                                                    return eventHandler.consumes();
098                                            }
099    
100                                            @Override
101                                            public P getPriority() {
102                                                    return eventHandler.getPriority();
103                                            }
104    
105                                            @Override
106                                            public int getCardinality() {
107                                                    return eventHandler.getCardinality();
108                                            }
109    
110                                            @Override
111                                            public void reset() {
112                                                    eventHandler.reset();
113                                            }
114    
115                                            @Override
116                                            public C getContext() {
117                                                    return eventHandler.getContext();
118                                            }
119    
120                                            @Override
121                                            public boolean isOneOff() {
122                                                    return eventHandler.isOneOff();
123                                            }
124    
125                                            @Override
126                                            public Mode getMode() {
127                                                    return eventHandler.getMode();
128                                            }
129    
130                                            private AtomicBoolean fired=new AtomicBoolean(false);
131    
132                                            @SuppressWarnings("unchecked")
133                                            @Override
134                                            public void post(
135                                                            EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, 
136                                                            InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext,                                                        
137                                                            Handle<E,P,C,Long>... handles) {
138                                                    if (!isOneOff() || !fired.getAndSet(true)) {
139                                                            E[] events = (E[]) Array.newInstance(eventBus.getEventType(), handles.length);
140                                                            for (int i=0; i<handles.length; ++i) {
141                                                                    if (handles[i].isValid()) {
142                                                                            events[i] = handles[i].getEvent();
143                                                                    } else {
144                                                                            return;
145                                                                    }
146                                                            }
147                                                            
148                                                            if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
149                                                                    
150    //                                                              if (inferenceContext.getInferenceCommandsQueue()!=null) {
151    //                                                                      throw new IllegalArgumentException("Inference commands queue shall be null");
152    //                                                              }
153    
154                                                                    inferenceContext = inferenceContext.wrap();
155                                                            }
156                                                            
157                                                            LocalEventDispatchContextImpl<E,P,C,S> dispatchContext;
158                                                                                                                    
159                                                            if (eventHandler.getMode()==Mode.POST || eventHandler.getMode()==Mode.BOTH) {
160                                                                    if (handles.length==1) {
161                                                                            dispatchContext = new LocalEventDispatchContextImpl<E, P, C, S>(
162                                                                                            inferenceContext,
163                                                                                            eventHandler, 
164                                                                                            registrationKey, 
165                                                                                            handles, 
166                                                                                            events,
167                                                                                            Mode.POST);
168                                                                    } else {
169                                                                            dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
170                                                                                            inferenceContext,
171                                                                                            eventHandler, 
172                                                                                            registrationKey, 
173                                                                                            handles, 
174                                                                                            events, 
175                                                                                            (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context,
176                                                                                            Mode.POST);                                             
177                                                                    }                                       
178                                                                    
179                                                                    LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
180                                                                    try {
181                                                                            if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) {
182                                                                                    if (!getPredicate().extract(getContext(), null, events)) {
183                                                                                            throw new EventDispatchException("Handler predicate evaluated to false before post: "+getPredicate()+", handler "+eventHandler);
184                                                                                    }
185                                                                            }
186                                                                            eventHandler.post(dispatchContext, events);
187                                                                            
188                                                                            // If no exception - post events
189                                                                            if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
190                                                                                    inferenceContext.processInferenceCommands();
191                                                                            }               
192                                                                    } catch (Exception e) {
193                                                                            logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
194                                                                            if (eventBus.getExceptionHandler()!=null) {
195                                                                                    eventBus.getExceptionHandler().handleException(e);
196                                                                            }
197                                                                            if (inferenceContext.getRootHandle()!=null) {
198                                                                                    inferenceContext.getRootHandle().handleException(e);
199                                                                            }
200                                                                    } finally {
201                                                                            LocalEventDispatchContextImpl.threadContext.set(null);                                          
202                                                                    }
203                                                                    if (isOneOff()) {
204                                                                            ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey);
205                                                                    }
206                                                            }
207                                                            
208                                                            if (eventHandler.getMode()==Mode.REMOVE || eventHandler.getMode()==Mode.BOTH) {
209                                                                    RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(eventHandler, registrationKey, handles, eventBus.getEventType());
210                                                                    for (Handle<E,P,C,Long> handle: handles) {
211                                                                            ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
212                                                                    }
213                                                            }
214                                                    }
215                                            }
216    
217                                            @Override
218                                            public Set<Long> getRegistrationKeys() {
219                                                    return keySet;
220                                            }
221    
222                                            @Override
223                                            public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() {
224                                                    return eventHandler;
225                                            }
226    
227                                            @Override
228                                            public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) {
229                                                    // NOP                                          
230                                            }
231    
232                                            @Override
233                                            public Predicate<E, C> getPredicate() {
234                                                    return eventHandler.getPredicate();
235                                            }
236                                            
237                                    };
238                                    wrappers.add(w);
239                            } else {
240                                    HandleJoiner joiner = createJoiner(eventHandler, registrationKey);
241                                    for (int i=0; i<eventHandler.getCardinality(); ++i) {
242                                            wrappers.add(createJoinHandler(eventHandler, registrationKey, keySet, joiner, i));
243                                    }
244                            }
245                            return registrationKey;
246                    } finally {
247                            getLock().writeLock().unlock();
248                    }
249            }
250    
251            private EventHandlerWrapper<E, P, C, Long, Handle<E, P, C, Long>, S> createJoinHandler(
252                            final EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler,
253                            final Long registrationKey,
254                            final Set<Long> keySet,
255                            final HandleJoiner joiner, 
256                            final int idx) {
257                    
258                    return new EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>() {
259    
260                            @Override
261                            public boolean consumes() {
262                                    return eventHandler.consumes();
263                            }
264    
265                            @Override
266                            public P getPriority() {
267                                    return eventHandler.getPriority();
268                            }
269    
270                            @Override
271                            public int getCardinality() {
272                                    return eventHandler.getCardinality();
273                            }
274    
275                            @Override
276                            public void reset() {
277                                    eventHandler.reset();
278                            }
279    
280                            @Override
281                            public C getContext() {
282                                    return eventHandler.getContext();
283                            }
284    
285                            @Override
286                            public boolean isOneOff() {
287                                    return eventHandler.isOneOff();
288                            }
289    
290                            @Override
291                            public Mode getMode() {
292                                    return eventHandler.getMode();
293                            }
294    
295                            @Override
296                            public void post(
297                                            EventDispatchContext<E,P,C, AbstractEventBus.Handle<E,P,C,Long>, S> context, 
298                                            InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext,                                                        
299                                            Handle<E,P,C,Long>... handles) {
300                                    
301                                    if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
302                                            
303    //                                      if (inferenceContext.getInferenceCommandsQueue()!=null) {
304    //                                              throw new IllegalArgumentException("Inference commands queue shall be null");
305    //                                      }
306    
307                                            inferenceContext = inferenceContext.wrap();
308                                    }
309                                                                    
310                                    CompositeContext<E, P, C, Long, Handle<E, P, C, Long>, S> compositeContext = new CompositeContext<E, P, C, Long,AbstractEventBus.Handle<E,P,C,Long>, S>(
311                                                    context,                                                
312                                                    eventHandler.getContext(), 
313                                                    inferenceContext);
314                                    
315                                    try {
316                                            joiner.addInput(idx, handles[0], compositeContext);
317                                    } catch (Exception e) {
318                                            logger.log(Level.SEVERE, "Join problem: "+e, e);
319                                            if (eventBus.getExceptionHandler()!=null) {
320                                                    eventBus.getExceptionHandler().handleException(e);
321                                            }
322                                            if (inferenceContext.getRootHandle()!=null) {
323                                                    inferenceContext.getRootHandle().handleException(e);
324                                            }
325                                    } finally {                                     
326                                            if (InferencePolicy.AFTER_HANDLER.equals(eventBus.getInferencePolicy())) {
327                                                    inferenceContext.processInferenceCommands();
328                                            }                                                       
329                                    }
330                                            
331                            }
332    
333                            @Override
334                            public Set<Long> getRegistrationKeys() {
335                                    return keySet;
336                            }
337    
338                            @Override
339                            public EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> getHandler() {
340                                    return eventHandler;
341                            }
342    
343                            @Override
344                            public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot, Set<Long> taken) {
345                                    // NOP                                          
346                            }
347    
348                            @Override
349                            public Predicate<E, C> getPredicate() {
350                                    /**
351                                     * Always returns true because predicate is evaluated in join().
352                                     */
353                                    return True.getInstance();
354                            }
355                            
356                    };
357            }
358    
359            @Override
360            public Iterable<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> match(E event, ExecutorService executorService) {              
361                    @SuppressWarnings("unchecked")
362                    E[] events = (E[]) Array.newInstance(eventBus.getEventType(), 1);
363                    events[0] = event;
364                    
365                    List<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> ret = new ArrayList<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>();
366                    getLock().readLock().lock();
367                    try {
368                            for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) {
369                                    if (w.getPredicate().extract(w.getContext(), null, events)) {
370                                            ret.add(w);
371                                    }
372                            }
373                    } finally {
374                            getLock().readLock().unlock();
375                    }                       
376                    
377                    Collections.sort(ret, new Comparator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>>() {
378    
379                            @Override
380                            public int compare(EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o1, EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> o2) {                                
381                                    P p1 = o1.getPriority();
382                                    P p2 = o2.getPriority();
383                                    if (p2==null) {
384                                            return p1==null ? o2.hashCode() - o1.hashCode() : 1; 
385                                    }
386                                    if (p1==null) {
387                                            return -1;
388                                    }
389                                    return p2.compareTo(p1);
390                            }
391                            
392                    });
393                    
394                    return ret;
395            }
396    
397            @Override
398            public void takeSnapshot(Snapshot<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> snapshot) {
399                    // TODO - Implement.
400                    throw new UnsupportedOperationException("LocalSimpleMatcher does not support takeSnapshot() operation");
401            }
402    
403            @Override
404            public void removeHandlers(Iterable<Long> keys) {
405                    getLock().writeLock().lock();
406                    try {
407                            for (Long key: keys) {
408                                    Iterator<EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>> it = wrappers.iterator();
409                                    Set<Long> rKeys = it.next().getRegistrationKeys();
410                                    for (Long rKey: rKeys) {
411                                            if (rKey.equals(key)) {
412                                                    it.remove();
413                                                    break;
414                                            }
415                                    }
416                            }
417                    } finally {
418                            getLock().writeLock().unlock();
419                    }               
420            }
421    
422            @Override
423            public void reset() {
424                    getLock().readLock().lock();
425                    try {
426                            for (EventHandlerWrapper<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> w: wrappers) {
427                                    w.reset();
428                            }
429                    } finally {
430                            getLock().readLock().unlock();
431                    }               
432            }
433    
434            @Override
435            public void manageHandlers(HandlerManager<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> handlerManager) {
436                    getLock().writeLock().lock();
437                    try {
438                            handlerManager.manageHandlers(this);
439                    } finally {
440                            getLock().writeLock().unlock();
441                    }
442            }
443    
444            @Override
445            public void setEventBus(EventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> bus) {
446                    this.eventBus = bus;            
447            }
448            
449            private class HandleJoiner extends Joiner<Handle<E,P,C,Long>, CompositeContext<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>, Object> {
450                    
451                    Lock lock = new ReentrantLock();
452    
453                    private EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler;
454                    private Collector<Handle<E,P,C,Long>>[] ic;
455    
456                    private Long registrationKey;
457    
458                    public HandleJoiner(
459                                    Collector<Handle<E,P,C,Long>>[] inputCollectors, 
460                                    Class<Handle<E,P,C,Long>> inputType, 
461                                    EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> handler,
462                                    Long registrationKey) {
463                            
464                            super(inputCollectors, inputType, false);
465                            this.ic = inputCollectors;
466                            this.handler = handler;
467                            this.registrationKey = registrationKey;
468                    }
469    
470                    @Override
471                    protected void startJoin() {
472    //                      System.out.println("Lock by "+Thread.currentThread());
473                            lock.lock();            
474                    }
475                    
476                    @Override
477                    protected void endJoin() {
478    //                      System.out.println("UnLock by "+Thread.currentThread());
479                            lock.unlock();          
480                    }
481                    
482                    private AtomicBoolean fired=new AtomicBoolean(false);
483                    
484                    @Override
485                    protected Object join(
486                                    final Handle<E,P,C,Long>[] inputs, 
487                                    final CompositeContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> context, 
488                                    final Joiner.InputConsumer consumer, 
489                                    int activator) throws Exception {
490                            
491                            if (!handler.isOneOff() || !fired.getAndSet(true)) {                            
492                                    final E[] events = (E[]) Array.newInstance(eventBus.getEventType(), inputs.length);
493                                    for (int i=0; i<inputs.length; ++i) {
494                                            if (inputs[i].isValid()) {
495                                                    events[i] = inputs[i].getEvent();
496                                            } else {
497                                                    return null;
498                                            }
499                                    }
500                                    
501                                    if (!handler.getPredicate().extract(handler.getContext(), null, events)) {
502                                            return null;
503                                    }
504                                                                    
505                                    if (handler.getMode()==Mode.POST || handler.getMode()==Mode.BOTH) {
506                                            LocalEventDispatchContextImpl<E,P,C,S> dispatchContext = new LocalEventDispatchJoinContextImpl<E, P, C, S>(
507                                                            context.getInferenceContext(),
508                                                            handler, 
509                                                            registrationKey, 
510                                                            inputs, 
511                                                            events, 
512                                                            (EventDispatchJoinContext<E, P, C, AbstractEventBus.Handle<E,P,C,Long>, S>) context.getEventDispatchContext(),
513                                                            Mode.POST) {
514                                                                                            
515                                            @Override
516                                            public void consumeJoin(int index) {
517                                                    consumer.consume(index);
518                                            }
519    
520                                            @Override
521                                            public void consumeJoin(E event) {
522                                                    for (int i=0; i<events.length; ++i) {
523                                                            if (event==events[i]) {
524                                                                    consumeJoin(i);
525                                                                    break;
526                                                            }
527                                                    }
528                                            }
529                                    };
530                                                                    
531                                    LocalEventDispatchContextImpl.threadContext.set(dispatchContext);
532                                            try {
533                                                    if (((LocalEventBusBase<E,P,C,S>) eventBus).isAssertPredicatesBeforePost()) {
534                                                            if (!handler.getPredicate().extract(handler.getContext(), null, events)) {
535                                                                    throw new EventDispatchException("Handler predicate evaluated to false before post: "+handler.getPredicate()+", handler "+handler);
536                                                            }
537                                                    }
538                                                    handler.post(dispatchContext, events);
539                                            } catch (Exception e) {
540                                                    logger.log(Level.SEVERE, "Exception in event handler: "+e, e);
541                                                    if (eventBus.getExceptionHandler()!=null) {
542                                                            eventBus.getExceptionHandler().handleException(e);
543                                                    }
544                                                    if (context.getInferenceContext().getRootHandle()!=null) {
545                                                            context.getInferenceContext().getRootHandle().handleException(e);
546                                                    }
547                                            } finally {
548                                                    LocalEventDispatchContextImpl.threadContext.set(null);                                          
549                                            }
550                                            if (handler.isOneOff()) {
551                                                    ((LocalEventBusBase<E,P,C,S>) eventBus).removeHandlers(registrationKey);
552                                                                                                    
553                                            }
554                                    }
555                                    
556                                    if (handler.getMode()==Mode.REMOVE || handler.getMode()==Mode.BOTH) {
557                                            RemoveListener<E,P,C,S> removeListener = new RemoveListener<E, P, C, S>(handler, registrationKey, inputs, eventBus.getEventType());
558                                            for (Handle<E,P,C,Long> handle: inputs) {
559                                                    ((LocalHandle<E,P,C,S>) handle).addRemoveListener(removeListener);
560                                            }
561                                    }
562                                    
563                            }
564                            return null;
565                    }
566                    
567                    @Override
568                    public String toString() {
569                            return "Simple HandleJoiner [handler=" + handler + ", ic=" + Arrays.toString(ic) + "]";
570                    };
571                    
572            }               
573            
574            @SuppressWarnings("unchecked")
575            protected HandleJoiner createJoiner(EventHandler<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S> eventHandler, Long registrationKey) {
576                    
577                    Collector<Handle<E,P,C,Long>>[] inputCollectors = new Collector[eventHandler.getCardinality()];
578                    for (int i=0; i<inputCollectors.length; ++i) {
579                            inputCollectors[i] = new Joiner.CollectionAdapter<Handle<E,P,C,Long>>(new ArrayList<Handle<E,P,C,Long>>() /* createCollection()*/) {
580                                    
581                                    /**
582                                     * Override to replace handles with facades of appropriate type.
583                                     */
584                                    public boolean add(AbstractEventBus.Handle<E,P,C,Long> handle) {
585                                            if (handle instanceof MasterHandle) {
586                                                    return super.add(new FacadeHandle((MasterHandle) handle, ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength()));
587                                            } else if (handle instanceof FacadeHandle) {
588                                                    return super.add(new FacadeHandle(((FacadeHandle) handle).getMaster(), ((LocalEventBusBase<E,P,C,S>) eventBus).getCollectorHandleStrength()));
589                                            } else {
590                                                    throw new IllegalArgumentException("Unexpected handle type: "+handle.getClass());
591                                            }
592                                    };
593                                    
594                                    public boolean remove(AbstractEventBus.Handle<E,P,C,Long> obj) {
595    //                                      System.out.println("Before remove: "+this);
596                                            Iterator<Handle<E, P, C, Long>> it = iterator();
597                                            while (it.hasNext()) {
598                                                    if (obj.getId().equals(it.next().getId())) {
599                                                            it.remove();
600                                                            return true;
601                                                    }
602                                            }
603                                            
604                                            return false;
605                                    };
606                            };
607                    }
608                    Class inputClass = Handle.class;
609                    return new HandleJoiner(inputCollectors, inputClass, eventHandler, registrationKey);            
610            }
611    
612    //      private Collection<Handle<E, P, C, Long>> createCollection() {
613    //              return InvocationRecordingProxyFactory.wrap(Collection.class, new ArrayList<Handle<E,P,C,Long>>(), new ProblemListener() {
614    //
615    //                      @Override
616    //                      public void onProblem(Throwable th, List<InvocationRecord> records) {
617    //                              Object[] ra = records.toArray();
618    //                              for (int i=0; i<ra.length; ++i) {
619    //                                      System.out.println(i+" "+ra[i]);
620    //                              }
621    //                              
622    //                              th.printStackTrace();
623    //                      }
624    //                      
625    //              });
626    //      }
627            
628            
629    }