1 | package com.hammurapi.common; |
2 | |
3 | import java.lang.reflect.Array; |
4 | import java.util.ArrayList; |
5 | import java.util.Collection; |
6 | import java.util.HashMap; |
7 | import java.util.Iterator; |
8 | import java.util.List; |
9 | import java.util.Map; |
10 | import java.util.logging.Level; |
11 | import java.util.logging.Logger; |
12 | |
13 | import com.hammurapi.common.Joiner.Collector.CollectorIterator; |
14 | import com.hammurapi.extract.CommutativeAnd; |
15 | import com.hammurapi.extract.Extractor; |
16 | import com.hammurapi.extract.Predicate; |
17 | |
18 | /** |
19 | * Helper class to join data from several inputs. |
20 | * @author Pavel Vlasov |
21 | * @param <T> Join element type. |
22 | * @param <C> Context type passed from addInput to join. |
23 | * @param <R> Return type passed from join to to addInput. |
24 | */ |
25 | public abstract class Joiner<T, C, R> { |
26 | |
27 | /** |
28 | * Collector of objects to join. Implements a subset of collection |
29 | * methods. |
30 | * @author Pavel Vlasov |
31 | * |
32 | * @param <T> |
33 | */ |
34 | public interface Collector<T> extends Iterable<T> { |
35 | |
36 | /** |
37 | * Collector iterator. <code>iterator()</code> may return iterator |
38 | * of this type for collectors which hold resource, e.g. files or |
39 | * database cursors. |
40 | * @author Pavel Vlasov |
41 | * |
42 | * @param <T> |
43 | */ |
44 | interface CollectorIterator<T> extends Iterator<T> { |
45 | |
46 | /** |
47 | * Close iterator to release resources. |
48 | */ |
49 | void close(); |
50 | |
51 | } |
52 | |
53 | /** |
54 | * Adds new object to the collector. |
55 | * @param obj |
56 | * @return true if object was added to the collector. |
57 | */ |
58 | boolean add(T obj); |
59 | |
60 | /** |
61 | * Removes object (last entry) from the collector. |
62 | * @param obj |
63 | * @return true if success |
64 | */ |
65 | boolean remove(T obj); |
66 | |
67 | /** |
68 | * Removes all object from the collector. |
69 | */ |
70 | void clear(); |
71 | |
72 | } |
73 | |
74 | /** |
75 | * Adapter for collections. |
76 | * @author Pavel Vlasov |
77 | * |
78 | * @param <T> |
79 | */ |
80 | public static class CollectionAdapter<T> implements Collector<T> { |
81 | |
82 | private Collection<T> master; |
83 | |
84 | public CollectionAdapter(Collection<T> master) { |
85 | this.master = master; |
86 | } |
87 | |
88 | public boolean add(T obj) { |
89 | return master.add(obj); |
90 | } |
91 | |
92 | public void clear() { |
93 | master.clear(); |
94 | } |
95 | |
96 | public boolean remove(T obj) { |
97 | return master.remove(obj); |
98 | } |
99 | |
100 | public Iterator<T> iterator() { |
101 | return master.iterator(); |
102 | } |
103 | |
104 | @Override |
105 | public String toString() { |
106 | return "CollectionAdapter [master=" + master + "]"; |
107 | } |
108 | |
109 | } |
110 | |
111 | private static final Logger logger = Logger.getLogger(Joiner.class.getName()); |
112 | |
113 | /** |
114 | * Interface to consume inputs in join() method. |
115 | * @author Pavel Vlasov |
116 | */ |
117 | public static class InputConsumer { |
118 | |
119 | boolean[] consumeFlags; |
120 | |
121 | InputConsumer(int size) { |
122 | consumeFlags = new boolean[size]; |
123 | } |
124 | |
125 | /** |
126 | * Consumes input at given index. |
127 | * @param index |
128 | */ |
129 | public void consume(int index) { |
130 | consumeFlags[index] = true; |
131 | } |
132 | } |
133 | |
134 | private Collector<T>[] inputCollectors; |
135 | private boolean outerJoin; |
136 | private Class<T> inputType; |
137 | private boolean isFine; |
138 | |
139 | /** |
140 | * Predicates to be fired at a particular index. |
141 | */ |
142 | private Map<Integer, List<Predicate<T, C>>> predicates = new HashMap<Integer, List<Predicate<T, C>>>(); |
143 | |
144 | /** |
145 | * Adds join predicate. |
146 | * @param predicate |
147 | */ |
148 | public void addPredicate(Predicate<T, C> predicate) { |
149 | if (predicate instanceof CommutativeAnd) { |
150 | // Break down the predicate. |
151 | for (Predicate<T,C> part: ((CommutativeAnd<T,C>) predicate).getParts()) { |
152 | addPredicate(part); |
153 | } |
154 | |
155 | } |
156 | |
157 | int maxIndex = 0; |
158 | for (Integer idx: predicate.parameterIndices()) { |
159 | if (idx>maxIndex) { |
160 | maxIndex = idx; |
161 | } |
162 | } |
163 | List<Predicate<T, C>> pl = predicates.get(maxIndex); |
164 | if (pl==null) { |
165 | pl = new ArrayList<Predicate<T, C>>(); |
166 | predicates.put(maxIndex, pl); |
167 | } |
168 | pl.add(predicate); |
169 | } |
170 | |
171 | /** |
172 | * Creates joiner. |
173 | * @param inputCollectors Input collectors. |
174 | * @param inputType Input type is required because of erasure of generics and T is needed at runtime. |
175 | * @param outerJoin If true, each input addition triggers invocation of join, even if other input collectors |
176 | * don't have any data. |
177 | */ |
178 | protected Joiner(Collector<T>[] inputCollectors, Class<T> inputType, boolean outerJoin) { |
179 | this.inputCollectors = inputCollectors; |
180 | this.outerJoin = outerJoin; |
181 | this.inputType = inputType; |
182 | this.isFine = logger.isLoggable(Level.FINE); |
183 | } |
184 | |
185 | /** |
186 | * Start join is invoked before join. Typically this method shall |
187 | * obtain a lock on input collectors. |
188 | */ |
189 | protected abstract void startJoin(); |
190 | |
191 | /** |
192 | * This method is invoked after join is finished. Typically this method |
193 | * shall release the lock on input collectors. |
194 | */ |
195 | protected abstract void endJoin(); |
196 | |
197 | /** |
198 | * Adds input. Input is joined with other accumulated inputs. |
199 | * |
200 | * @param index Input index. |
201 | * @param input Input. |
202 | */ |
203 | @SuppressWarnings("unchecked") |
204 | public List<R> addInput(int index, T input, C context) throws Exception { |
205 | startJoin(); |
206 | try { |
207 | if (isFine) { |
208 | logger.fine("New join input: "+input+" at index "+index); |
209 | StringBuilder sb = new StringBuilder(); |
210 | sb.append("Input collectors: "); |
211 | for (Collector<T> c:inputCollectors) { |
212 | sb.append(c+" "); |
213 | } |
214 | logger.fine(sb.toString()); |
215 | } |
216 | if (isValidInput(index, input)) { |
217 | if (inputCollectors[index].add(input)) { // Join only if successfully added - for sets. |
218 | T[] inputs = (T[]) Array.newInstance(inputType, inputCollectors.length); |
219 | inputs[index] = input; |
220 | InputConsumer consumer = new InputConsumer(inputCollectors.length); |
221 | List<R> ret = new ArrayList<R>(); |
222 | Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache = new HashMap<C, Map<Extractor<T,? super Boolean,C>,? super Boolean>>(); |
223 | join(inputs, 0, index, context, consumer, cache , ret); |
224 | if (consumer.consumeFlags[index]) { // Current input was consumed. |
225 | if (!inputCollectors[index].remove(input)) { |
226 | throw new IllegalStateException("Cannot remove just added input"); |
227 | } |
228 | } |
229 | return ret; |
230 | } |
231 | } |
232 | |
233 | return null; |
234 | } finally { |
235 | endJoin(); |
236 | } |
237 | } |
238 | |
239 | private void join(T[] inputs, int currentIndex, int inputIndex, C context, InputConsumer consumer, Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache, List<R> resultCollector) throws Exception { |
240 | // System.out.println(this+" Join "+currentIndex+" "+Arrays.toString(inputs)); |
241 | if (currentIndex>0) { |
242 | |
243 | List<Predicate<T, C>> pl = predicates.get(currentIndex-1); |
244 | if (pl!=null) { |
245 | for (Predicate<T, C> predicate: pl) { |
246 | if (!predicate.extract(context, cache, inputs)) { |
247 | return; |
248 | } |
249 | } |
250 | } |
251 | |
252 | if (!partialJoin(inputs, currentIndex-1, consumer)) { |
253 | return; |
254 | } |
255 | } |
256 | |
257 | if (currentIndex==inputs.length) { |
258 | for (int i=0; i<consumer.consumeFlags.length; ++i) { |
259 | consumer.consumeFlags[i] = false; |
260 | } |
261 | resultCollector.add(join(inputs, context, consumer, inputIndex)); |
262 | } else if (currentIndex==inputIndex) { |
263 | join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector); |
264 | } else { |
265 | if (outerJoin) { |
266 | inputs[currentIndex]=null; |
267 | join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector); |
268 | if (consumer.consumeFlags[inputIndex]) { // Input was consumed |
269 | return; |
270 | } |
271 | for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here. |
272 | if (consumer.consumeFlags[i]) { |
273 | return; |
274 | } |
275 | } |
276 | } |
277 | |
278 | Iterator<T> it = inputCollectors[currentIndex].iterator(); |
279 | try { |
280 | while (it.hasNext()) { |
281 | inputs[currentIndex]=it.next(); |
282 | if (isValidInput(currentIndex, inputs[currentIndex])) { |
283 | join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector); |
284 | if (consumer.consumeFlags[currentIndex]) { // This input was consumed. |
285 | it.remove(); |
286 | return; |
287 | } |
288 | |
289 | if (consumer.consumeFlags[inputIndex]) { // Input was consumed |
290 | return; |
291 | } |
292 | |
293 | for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here. |
294 | if (consumer.consumeFlags[i]) { |
295 | return; |
296 | } |
297 | } |
298 | } |
299 | } |
300 | } finally { |
301 | if (it instanceof CollectorIterator) { |
302 | ((CollectorIterator<?>) it).close(); |
303 | } |
304 | } |
305 | } |
306 | } |
307 | |
308 | /** |
309 | * This method is invoked for every combination of valid inputs. |
310 | * @param inputs Inputs |
311 | * @param context Join context passed from addInput |
312 | * @param consumer callback interface to consume inputs. |
313 | * @param activator Index of join activator. |
314 | * @return Join result. |
315 | * @throws Exception |
316 | */ |
317 | protected abstract R join(T[] inputs, C context, InputConsumer consumer, int activator) throws Exception; |
318 | |
319 | /** |
320 | * This method is invoked for all inputs before joining. It is useful for situation when |
321 | * inputs in internal collections may become invalid over the course of life of joiner. |
322 | * @param index |
323 | * @param input |
324 | * @return |
325 | */ |
326 | protected boolean isValidInput(int index, T input) { |
327 | return true; |
328 | } |
329 | |
330 | /** |
331 | * This method is invoked to validate already joined inputs. |
332 | * If this method returns false, further joining is abandoned. |
333 | * @param inputs Inputs array. |
334 | * @param index Index of last already joined input. |
335 | * @param consumer Callback interface to consume inputs. |
336 | * @return |
337 | */ |
338 | protected boolean partialJoin(T[] inputs, int index, InputConsumer consumer) throws Exception { |
339 | return true; |
340 | } |
341 | |
342 | /** |
343 | * Clears join collections. |
344 | */ |
345 | public void reset() { |
346 | for (Collector<T> collector: inputCollectors) { |
347 | collector.clear(); |
348 | } |
349 | } |
350 | |
351 | } |