001package com.hammurapi.common;
002
003import java.lang.reflect.Array;
004import java.util.ArrayList;
005import java.util.Collection;
006import java.util.HashMap;
007import java.util.Iterator;
008import java.util.List;
009import java.util.Map;
010import java.util.concurrent.ConcurrentHashMap;
011import java.util.logging.Level;
012import java.util.logging.Logger;
013
014import com.hammurapi.extract.CommutativeAnd;
015import com.hammurapi.extract.Extractor;
016import com.hammurapi.extract.Predicate;
017
018/**
019 * Helper class to join data from several inputs.
020 * @author Pavel Vlasov
021 * @param <T> Join element type.
022 * @param <C> Context type passed from addInput to join.
023 * @param <R> Return type passed from join to to addInput.
024 */
025public abstract class Joiner<T, C, R> {   
026        
027        /**
028         * Collector of objects to join. Implements a subset of collection 
029         * methods. 
030         * @author Pavel Vlasov
031         *
032         * @param <T>
033         */
034        public interface Collector<T> extends Iterable<T> {
035                                
036                /**
037                 * Adds new object to the collector. This method shall not break currently open iterators.
038                 * @param obj
039                 * @return true if object was added to the collector.
040                 */
041                boolean add(T obj);
042                
043                /**
044                 * Removes object (last entry) from the collector.
045                 * @param obj
046                 * @return true if success
047                 */
048                boolean remove(T obj);
049                
050                /**
051                 * Removes all object from the collector.
052                 */
053                void clear();
054                
055        }
056        
057        /**
058         * Adapter for collections.
059         * @author Pavel Vlasov
060         *
061         * @param <T>
062         */
063        public static class CollectionAdapter<T> implements Collector<T> {
064                
065                private Collection<T> master;
066                
067                public CollectionAdapter(Collection<T> master) {
068                        this.master = master;
069                }
070
071                public boolean add(T obj) {
072                        return master.add(obj);
073                }
074
075                public void clear() {
076                        master.clear();                 
077                }
078
079                public boolean remove(T obj) {
080                        return master.remove(obj);                      
081                }
082
083                public Iterator<T> iterator() {
084                        return master.iterator();
085                }
086
087                @Override
088                public String toString() {
089                        return "CollectionAdapter [master=" + master + "]";
090                }
091                
092        }
093        
094        private static final Logger logger = Logger.getLogger(Joiner.class.getName());
095        
096        /**
097         * Interface to consume inputs in join() method.
098         * @author Pavel Vlasov
099         */
100        public static class InputConsumer {
101                
102                boolean[] consumeFlags;
103
104                InputConsumer(int size) {
105                        consumeFlags = new boolean[size];
106                }
107                
108                /**
109                 * Consumes input at given index.
110                 * @param index
111                 */
112                public void consume(int index) {
113                        consumeFlags[index] = true;
114                }
115        }
116        
117        private Collector<T>[] inputCollectors;
118        private boolean outerJoin;
119        private Class<T> inputType;
120        private boolean isFine;
121        
122        /**
123         * Predicates to be fired at a particular index.
124         */
125        private Map<Integer, List<Predicate<T, C>>> predicates = new HashMap<Integer, List<Predicate<T, C>>>();
126        
127        /**
128         * Adds join predicate.
129         * @param predicate
130         */
131        public void addPredicate(Predicate<T, C> predicate) {
132                if (predicate instanceof CommutativeAnd) {
133                        // Break down the predicate.
134                        for (Predicate<T,C> part: ((CommutativeAnd<T,C>) predicate).getParts()) {
135                                addPredicate(part);
136                        }               
137                } else {                
138                        int maxIndex = 0;
139                        for (Integer idx: predicate.parameterIndices()) {
140                                if (idx>maxIndex) {
141                                        maxIndex = idx;
142                                }
143                        }
144                        List<Predicate<T, C>> pl = predicates.get(maxIndex);
145                        if (pl==null) {
146                                pl = new ArrayList<Predicate<T, C>>();
147                                predicates.put(maxIndex, pl);
148                        }
149                        pl.add(predicate);
150                }
151        }
152
153        /**
154         * Creates joiner.
155         * @param inputCollectors Input collectors.
156         * @param inputType Input type is required because of erasure of generics and T is needed at runtime.
157         * @param outerJoin If true, each input addition triggers invocation of join, even if other input collectors
158         * don't have any data.
159         */
160        protected Joiner(Collector<T>[] inputCollectors, Class<T> inputType, boolean outerJoin) {
161                this.inputCollectors = inputCollectors;
162                this.outerJoin = outerJoin;
163                this.inputType = inputType;
164                this.isFine = logger.isLoggable(Level.FINE);
165        }
166        
167        /**
168         * Start join is invoked before join. Typically this method shall
169         * obtain a lock on input collectors.
170         */
171        protected abstract void startJoin();
172        
173        /**
174         * This method is invoked after join is finished. Typically this method
175         * shall release the lock on input collectors.
176         */
177        protected abstract void endJoin();
178        
179        /**
180         * Adds input. Input is joined with other accumulated inputs.
181         *  
182         * @param index Input index. 
183         * @param input Input.
184         */
185        @SuppressWarnings("unchecked")
186        public List<R> addInput(int index, T input, C context) throws Exception {
187                if (isFine) {
188                        logger.fine("New join input: "+input+" at index "+index);
189                        StringBuilder sb = new StringBuilder();
190                        sb.append("Input collectors: ");
191                        for (Collector<T> c:inputCollectors) {
192                                sb.append(c+" ");
193                        }
194                        logger.fine(sb.toString());
195                }
196                if (isValidInput(index, input)) {
197                        startJoin();
198                        try {
199                                if (inputCollectors[index].add(input)) { // Join only if successfully added - for sets.
200                                        T[] inputs = (T[]) Array.newInstance(inputType, inputCollectors.length);
201                                        inputs[index] = input;
202                                        InputConsumer consumer = new InputConsumer(inputCollectors.length);                     
203                                        List<R> ret = new ArrayList<R>();
204                                        Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache = new ConcurrentHashMap<C, Map<Extractor<T,? super Boolean,C>,? super Boolean>>(); 
205                                        join(inputs, 0, index, context, consumer, cache , ret);
206                                        if (consumer.consumeFlags[index]) { // Current input was consumed.
207                                                if (!inputCollectors[index].remove(input)) {
208                                                        throw new IllegalStateException("Cannot remove just added input");
209                                                }
210                                        }
211                                        return ret;
212                                }
213                        } finally {
214                                endJoin();
215                        }
216                }
217                return null;
218        }
219        
220        /**
221         * This method can be overridden to implement concurrent joining, i.e. at particular indices the sub-method wraps call to the super-method
222         * into a task executed in a separate thread.
223         * @param inputs
224         * @param currentIndex
225         * @param inputIndex
226         * @param context
227         * @param consumer
228         * @param cache
229         * @param resultCollector
230         * @throws Exception
231         */
232        protected void join(T[] inputs, int currentIndex, int inputIndex, C context, InputConsumer consumer, Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache, List<R> resultCollector) throws Exception {
233//              System.out.println(this+" Join "+currentIndex+" "+Arrays.toString(inputs));
234                if (currentIndex<inputs.length) {
235                        
236                        List<Predicate<T, C>> pl = predicates.get(currentIndex);
237                        if (pl!=null) {
238                                for (Predicate<T, C> predicate: pl) {
239                                        if (!predicate.extract(context, cache, inputs)) {
240                                                return;
241                                        }
242                                }
243                        }
244                        
245                        if (!partialJoin(inputs, context, consumer, currentIndex, inputIndex)) {
246                                return;
247                        }
248                }
249                
250                if (currentIndex==inputs.length) {
251                        for (int i=0; i<consumer.consumeFlags.length; ++i) {
252                                consumer.consumeFlags[i] = false;
253                        }
254                        R joinResult = join(inputs, context, consumer, inputIndex);
255                        synchronized (resultCollector) { // For concurrent joins.
256                                resultCollector.add(joinResult);
257                        }
258                } else if (currentIndex==inputIndex) {
259                        join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector);
260                } else {                        
261                        if (outerJoin) {
262                                inputs[currentIndex]=null;
263                                join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector);
264                                if (consumer.consumeFlags[inputIndex]) { // Input was consumed
265                                        return;
266                                }
267                                for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
268                                        if (consumer.consumeFlags[i]) {
269                                                return;
270                                        }
271                                }
272                        }
273                        
274                        Iterator<T> it = inputCollectors[currentIndex].iterator();
275                        try {
276                                while (it.hasNext()) {
277                                        inputs[currentIndex]=it.next();
278                                        if (isValidInput(currentIndex, inputs[currentIndex])) {
279                                                join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector);
280                                                if (consumer.consumeFlags[currentIndex]) { // This input was consumed.
281                                                        it.remove();                                    
282                                                        return;
283                                                }
284                                                
285                                                if (consumer.consumeFlags[inputIndex]) { // Input was consumed
286                                                        return;
287                                                }
288                                                
289                                                for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
290                                                        if (consumer.consumeFlags[i]) {
291                                                                return;
292                                                        }
293                                                }
294                                        } else {
295                                                if (consumer.consumeFlags[currentIndex]) { // This input is not valid anymore.
296                                                        it.remove();                                    
297                                                        return;
298                                                }
299                                        }
300                                }
301                        } finally {
302                                if (it instanceof AutoCloseable) {
303                                        ((AutoCloseable) it).close();
304                                }
305                        }
306                }               
307        }
308        
309        /**
310         * This method is invoked for every combination of valid inputs.
311         * @param inputs Inputs
312         * @param context Join context passed from addInput
313         * @param consumer callback interface to consume inputs.
314         * @param activator Index of join activator.
315         * @return Join result. 
316         * @throws Exception
317         */
318        protected abstract R join(T[] inputs, C context, InputConsumer consumer, int activator) throws Exception;
319        
320        /**
321         * This method is invoked for all inputs before joining. It is useful for situation when 
322         * inputs in internal collections may become invalid over the course of life of joiner.
323         * @param index
324         * @param input
325         * @return
326         */
327        protected boolean isValidInput(int index, T input) {
328                return true;
329        }
330        
331        /**
332         * This method is invoked to validate already joined inputs.
333         * If this method returns false, further joining is abandoned.
334         * @param inputs Inputs array.
335         * @param index Index of last already joined input.
336         * @param consumer Callback interface to consume inputs. 
337         * @return
338         */
339        protected boolean partialJoin(T[] inputs, C context, InputConsumer consumer, int index, int activator) throws Exception {
340                return true;
341        }
342        
343        /**
344         * Clears join collections.
345         */
346        public void reset() {
347                for (Collector<T> collector: inputCollectors) {
348                        collector.clear();
349                }
350        }
351
352}