001 package com.hammurapi.flow.runtime.impl;
002
003 import java.lang.reflect.Array;
004 import java.util.Collection;
005 import java.util.Iterator;
006 import java.util.logging.Level;
007 import java.util.logging.Logger;
008
009 /**
010 * Helper class to join data from several inputs.
011 * @author Pavel Vlasov
012 * @param <T> Join element type.
013 * @param <C> Context type passed from addInput to join.
014 * @param <R> Return type passed from join to to addInput.
015 */
016 public abstract class Joiner<T, C, R> {
017 private static final Logger logger = Logger.getLogger(Joiner.class.getName());
018
019 /**
020 * Interface to consume inputs in join() method.
021 * @author Pavel Vlasov
022 */
023 public static class InputConsumer {
024
025 boolean[] consumeFlags;
026
027 InputConsumer(int size) {
028 consumeFlags = new boolean[size];
029 }
030
031 /**
032 * Consumes input at given index.
033 * @param index
034 */
035 public void consume(int index) {
036 consumeFlags[index] = true;
037 }
038 }
039
040 private Collection<T>[] inputCollectors;
041 private boolean outerJoin;
042 private Class<T> inputType;
043 private boolean isFine;
044
045 /**
046 * Creates joiner.
047 * @param inputCollectors Input collectors.
048 * @param inputType Input type is required because of erasure of generics and T is needed at runtime.
049 * @param outerJoin If true, each input addition triggers invocation of join, even if other input collectors
050 * don't have any data.
051 */
052 protected Joiner(Collection<T>[] inputCollectors, Class<T> inputType, boolean outerJoin) {
053 this.inputCollectors = inputCollectors;
054 this.outerJoin = outerJoin;
055 this.inputType = inputType;
056 this.isFine = logger.isLoggable(Level.FINE);
057 }
058
059 /**
060 * Adds input. Input is joined with other accumulated inputs (including
061 * @param index Input index.
062 * @param input Input.
063 */
064 @SuppressWarnings("unchecked")
065 public synchronized R addInput(int index, T input, C context) throws Exception {
066 if (isFine) {
067 logger.fine("New join input: "+input+" at index "+index);
068 StringBuilder sb = new StringBuilder();
069 sb.append("Input collectors: ");
070 for (Collection<T> c:inputCollectors) {
071 sb.append(c.size()+" ");
072 }
073 logger.fine(sb.toString());
074 }
075 if (isValidInput(index, input)) {
076 if (inputCollectors[index].add(input)) { // Join only if successfully added - for sets.
077 T[] inputs = (T[]) Array.newInstance(inputType, inputCollectors.length);
078 inputs[index] = input;
079 InputConsumer consumer = new InputConsumer(inputCollectors.length);
080 R ret = join(inputs, 0, index, context, consumer);
081 if (consumer.consumeFlags[index]) { // Current input was consumed.
082 inputCollectors[index].remove(input);
083 }
084 return ret;
085 }
086 }
087
088 return null;
089 }
090
091 private R join(T[] inputs, int currentIndex, int inputIndex, C context, InputConsumer consumer) throws Exception {
092 if (currentIndex>0 && !partialJoin(inputs, currentIndex-1)) {
093 return null;
094 }
095
096 if (currentIndex==inputs.length) {
097 for (int i=0; i<consumer.consumeFlags.length; ++i) {
098 consumer.consumeFlags[i] = false;
099 }
100 return join(inputs, context, consumer, inputIndex);
101 }
102
103 if (currentIndex==inputIndex) {
104 return join(inputs, currentIndex+1, inputIndex, context, consumer);
105 }
106
107 if (outerJoin) {
108 inputs[currentIndex]=null;
109 R ret = join(inputs, currentIndex+1, inputIndex, context, consumer);
110 if (consumer.consumeFlags[inputIndex]) { // Input was consumed
111 return ret;
112 }
113 for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
114 if (consumer.consumeFlags[i]) {
115 return ret;
116 }
117 }
118 }
119
120 Iterator<T> it = inputCollectors[currentIndex].iterator();
121 while (it.hasNext()) {
122 inputs[currentIndex]=it.next();
123 if (isValidInput(currentIndex, inputs[currentIndex])) {
124 R ret = join(inputs, currentIndex+1, inputIndex, context, consumer);
125 if (consumer.consumeFlags[currentIndex]) { // This input was consumed.
126 it.remove();
127 }
128 if (consumer.consumeFlags[inputIndex]) { // Input was consumed
129 return ret;
130 }
131 for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
132 if (consumer.consumeFlags[i]) {
133 return ret;
134 }
135 }
136 }
137 }
138
139 return null;
140 }
141
142 /**
143 * This method is invoked for every combination of valid inputs.
144 * @param inputs Inputs
145 * @return Array of consumed flags. The array shall be the same size as inputs or null.
146 * If it is not null, each input for which the array contains true is considered to be
147 * "consumed", i.e. it gets removed from internal collections and doesn't participate
148 * in further joins.
149 * @throws Exception
150 */
151 protected abstract R join(T[] inputs, C context, InputConsumer consumer, int activator) throws Exception;
152
153 /**
154 * This method is invoked for all inputs before joining. It is useful for situation when
155 * inputs in internal collections may become invalid over the course of life of joiner.
156 * @param index
157 * @param input
158 * @return
159 */
160 protected abstract boolean isValidInput(int index, T input);
161
162 /**
163 * This method is invoked to validate already joined inputs.
164 * If this method returns false, further joining is abandoned.
165 * @param inputs Inputs array.
166 * @param index Index of last already joined input.
167 * @return
168 */
169 protected abstract boolean partialJoin(T[] inputs, int index) throws Exception;
170
171 }