001 package com.hammurapi.eventbus;
002
003 import com.hammurapi.eventbus.AbstractEventBus.Handle;
004
005 /**
006 * Discards post commands with events already present in event store.
007 * @author Pavel Vlasov
008 */
009 public class DuplicatesFilter<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements InferenceFilter<E,P,C,K,H,S> {
010
011 @SuppressWarnings("unchecked")
012 public boolean accept(InferenceCommand<E,P,C,K,H,S> inferenceCommand, EventBus<E,P,C,K,H,S> bus) {
013 if (inferenceCommand instanceof PostCommand) {
014 PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) inferenceCommand;
015 E event = postCommand.getEvent();
016 if (bus.getStore().getPrimaryKeyExtractor()==null) {
017 bus.getStore().readLock().lock();
018 try {
019 for (H handle: bus.getStore()) {
020 if (event.equals(handle.getEvent())) {
021 K handlerId = postCommand.getHandlerId();
022 EventHandler<E,P,C,H,S> handler = postCommand.getHandler();
023 Handle<E, P, C, K>[] inputs = postCommand.getInputs();
024 ((AbstractEventBus.Handle<E, P, C, K>) handle).addDerivation(handlerId, handler, inputs);
025 return false;
026 }
027 }
028 } finally {
029 bus.getStore().readLock().unlock();
030 }
031 } else {
032 return bus.getStore().getByPrimaryKey(event)==null;
033 }
034 }
035 return true;
036 }
037 }