| 1 | package com.hammurapi.eventbus; |
| 2 | |
| 3 | import com.hammurapi.eventbus.AbstractEventBus.Handle; |
| 4 | |
| 5 | /** |
| 6 | * Discards post commands with events already present in event store. |
| 7 | * @author Pavel Vlasov |
| 8 | */ |
| 9 | public class DuplicatesFilter<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements InferenceFilter<E,P,C,K,H,S> { |
| 10 | |
| 11 | @SuppressWarnings("unchecked") |
| 12 | public boolean accept(InferenceCommand<E,P,C,K,H,S> inferenceCommand, EventBus<E,P,C,K,H,S> bus) { |
| 13 | if (inferenceCommand instanceof PostCommand) { |
| 14 | PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) inferenceCommand; |
| 15 | E event = postCommand.getEvent(); |
| 16 | if (bus.getStore().getPrimaryKeyExtractor()==null) { |
| 17 | bus.getStore().readLock().lock(); |
| 18 | try { |
| 19 | for (H handle: bus.getStore()) { |
| 20 | if (event.equals(handle.getEvent())) { |
| 21 | K handlerId = postCommand.getHandlerId(); |
| 22 | EventHandler<E,P,C,H,S> handler = postCommand.getHandler(); |
| 23 | Handle<E, P, C, K>[] inputs = postCommand.getInputs(); |
| 24 | ((AbstractEventBus.Handle<E, P, C, K>) handle).addDerivation(handlerId, handler, inputs); |
| 25 | return false; |
| 26 | } |
| 27 | } |
| 28 | } finally { |
| 29 | bus.getStore().readLock().unlock(); |
| 30 | } |
| 31 | } else { |
| 32 | return bus.getStore().getByPrimaryKey(event)==null; |
| 33 | } |
| 34 | } |
| 35 | return true; |
| 36 | } |
| 37 | } |