001 package com.hammurapi.eventbus;
002
003 import java.io.Serializable;
004 import java.util.Arrays;
005
006 import com.hammurapi.eventbus.AbstractEventBus.Handle;
007 import com.hammurapi.extract.Predicate;
008
009 /**
010 * Handler which maps input tuples like {{E1, E2, E5}, {E4, E3}} to {E1, E2, E3, E4, E5}
011 * @author Pavel Vlasov
012 *
013 * @param <E>
014 * @param <P>
015 * @param <C>
016 */
017 public class MappingEventHandler<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E, P, C>, S extends EventStore<E,P,C,H,S>> implements JoinEventHandler<E, P, C, K, H, S>, Serializable {
018
019 private EventHandlerWrapper<E, P, C, K, H, S> target;
020 private Mapper<Handle<E,P,C,K>>[] mappers;
021
022 public MappingEventHandler(EventHandlerWrapper<E, P, C, K, H, S> target, Mapper<Handle<E,P,C,K>>[] mappers) {
023 this.target = target;
024 this.mappers = mappers;
025 }
026
027 @Override
028 public boolean consumes() {
029 return target.consumes();
030 }
031
032 @Override
033 public int getCardinality() {
034 return mappers.length;
035 }
036
037 @Override
038 public C getContext() {
039 return target.getContext();
040 }
041
042 @Override
043 public P getPriority() {
044 return target.getPriority();
045 }
046
047 @SuppressWarnings("unchecked")
048 @Override
049 public void post(
050 final EventDispatchContext<E,P,C,H,S> context,
051 InferenceContext<E,P,C,K,H,S> inferenceContext,
052 final Handle<E,P,C,K>[]... handles) {
053
054 Handle<E,P,C,K>[] targetInput = new Handle[target.getCardinality()];
055 for (int i=0; i<mappers.length; ++i) {
056 mappers[i].map(handles[i], targetInput);
057 }
058
059 EventDispatchJoinContext<E,P,C,H,S> mappingContext = new EventDispatchJoinContextFilter<E,P,C,H,S>(context) {
060
061 public void consumeJoin(E event) {
062 for (int i=0; i<handles.length; ++i) {
063 for (Handle<E,P,C,K> h: handles[i]) {
064 if (event==h.getEvent()) {
065 ((EventDispatchJoinContext<E, P, C, H, S>) context).consumeJoin(i);
066 return;
067 }
068 }
069 }
070 };
071
072 @Override
073 public void consumeJoin(int index) {
074 for (int i=0; i<mappers.length; ++i) {
075 for (int idx: mappers[i].getIndexMap()) {
076 if (idx==index) {
077 ((EventDispatchJoinContext<E, P, C, H, S>) context).consumeJoin(i);
078 return;
079 }
080 }
081 }
082 }
083
084 };
085
086 target.post(mappingContext, inferenceContext, targetInput);
087 }
088
089 @Override
090 public void reset() {
091 target.reset();
092 }
093
094 @Override
095 public String toString() {
096 return "MappingEventHandler [target=" + target + ", mappers=" + Arrays.toString(mappers) + "]";
097 }
098
099 @Override
100 public boolean isOneOff() {
101 return target.isOneOff();
102 }
103
104 @Override
105 public Mode getMode() {
106 return target.getMode();
107 }
108
109 @Override
110 public Predicate<E, C> getPredicate() {
111 throw new UnsupportedOperationException(); // TODO - Map target predicate if needed.
112 }
113
114
115 }