1 | package com.hammurapi.eventbus; |
2 | |
3 | import com.hammurapi.eventbus.AbstractEventBus.Handle; |
4 | |
5 | /** |
6 | * Discards post commands with events already present in event store. |
7 | * @author Pavel Vlasov |
8 | */ |
9 | public class DuplicatesFilter<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements InferenceFilter<E,P,C,K,H,S> { |
10 | |
11 | @SuppressWarnings("unchecked") |
12 | public boolean accept(InferenceCommand<E,P,C,K,H,S> inferenceCommand, EventBus<E,P,C,K,H,S> bus) { |
13 | if (inferenceCommand instanceof PostCommand) { |
14 | PostCommand<E,P,C,K,H,S> postCommand = (PostCommand<E,P,C,K,H,S>) inferenceCommand; |
15 | E event = postCommand.getEvent(); |
16 | if (bus.getStore().getPrimaryKeyExtractor()==null) { |
17 | bus.getStore().readLock().lock(); |
18 | try { |
19 | for (H handle: bus.getStore()) { |
20 | if (event.equals(handle.getEvent())) { |
21 | K handlerId = postCommand.getHandlerId(); |
22 | EventHandler<E,P,C,H,S> handler = postCommand.getHandler(); |
23 | Handle<E, P, C, K>[] inputs = postCommand.getInputs(); |
24 | ((AbstractEventBus.Handle<E, P, C, K>) handle).addDerivation(handlerId, handler, inputs); |
25 | return false; |
26 | } |
27 | } |
28 | } finally { |
29 | bus.getStore().readLock().unlock(); |
30 | } |
31 | } else { |
32 | return bus.getStore().getByPrimaryKey(event)==null; |
33 | } |
34 | } |
35 | return true; |
36 | } |
37 | } |