001    package com.hammurapi.eventbus;
002    
003    import com.hammurapi.eventbus.AbstractEventBus.Handle;
004    
005    /**
006     * Discards post commands with events already present in event store. 
007     * @author Pavel Vlasov
008     */
009    public class DuplicatesFilter<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements InferenceFilter<E,P,C,K,H,S> {
010            
011            @SuppressWarnings("unchecked")
012            public boolean accept(InferenceCommand<E,P,C,K,H,S> inferenceCommand, EventBus<E,P,C,K,H,S> bus) {
013                    if (inferenceCommand instanceof PostCommand) {
014                            PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) inferenceCommand;
015                            E event = postCommand.getEvent();
016                            if (bus.getStore().getPrimaryKeyExtractor()==null) {
017                                    bus.getStore().readLock().lock();
018                                    try {
019                                            for (H handle: bus.getStore()) {
020                                                    if (event.equals(handle.getEvent())) {
021                                                            K handlerId = postCommand.getHandlerId();
022                                                            EventHandler<E,P,C,H,S> handler = postCommand.getHandler();
023                                                            Handle<E, P, C, K>[] inputs = postCommand.getInputs();
024                                                            ((AbstractEventBus.Handle<E, P, C, K>) handle).addDerivation(handlerId, handler, inputs);
025                                                            return false;
026                                                    }
027                                            }
028                                    } finally {
029                                            bus.getStore().readLock().unlock();
030                                    }
031                            } else {
032                                    return bus.getStore().getByPrimaryKey(event)==null;
033                            }
034                    }
035                    return true;
036            }
037    }