001    package com.hammurapi.flow.runtime.impl;
002    
003    import java.lang.reflect.Array;
004    import java.util.Collection;
005    import java.util.Iterator;
006    import java.util.logging.Level;
007    import java.util.logging.Logger;
008    
009    /**
010     * Helper class to join data from several inputs.
011     * @author Pavel Vlasov
012     * @param <T> Join element type.
013     * @param <C> Context type passed from addInput to join.
014     * @param <R> Return type passed from join to to addInput.
015     */
016    public abstract class Joiner<T, C, R> {
017            private static final Logger logger = Logger.getLogger(Joiner.class.getName());
018            
019            /**
020             * Interface to consume inputs in join() method.
021             * @author Pavel Vlasov
022             */
023            public static class InputConsumer {
024                    
025                    boolean[] consumeFlags;
026    
027                    InputConsumer(int size) {
028                            consumeFlags = new boolean[size];
029                    }
030                    
031                    /**
032                     * Consumes input at given index.
033                     * @param index
034                     */
035                    public void consume(int index) {
036                            consumeFlags[index] = true;
037                    }
038            }
039            
040            private Collection<T>[] inputCollectors;
041            private boolean outerJoin;
042            private Class<T> inputType;
043            private boolean isFine;
044    
045            /**
046             * Creates joiner.
047             * @param inputCollectors Input collectors.
048             * @param inputType Input type is required because of erasure of generics and T is needed at runtime.
049             * @param outerJoin If true, each input addition triggers invocation of join, even if other input collectors
050             * don't have any data.
051             */
052            protected Joiner(Collection<T>[] inputCollectors, Class<T> inputType, boolean outerJoin) {
053                    this.inputCollectors = inputCollectors;
054                    this.outerJoin = outerJoin;
055                    this.inputType = inputType;
056                    this.isFine = logger.isLoggable(Level.FINE);
057            }
058            
059            /**
060             * Adds input. Input is joined with other accumulated inputs (including 
061             * @param index Input index. 
062             * @param input Input.
063             */
064            @SuppressWarnings("unchecked")
065            public synchronized R addInput(int index, T input, C context) throws Exception {
066                    if (isFine) {
067                            logger.fine("New join input: "+input+" at index "+index);
068                            StringBuilder sb = new StringBuilder();
069                            sb.append("Input collectors: ");
070                            for (Collection<T> c:inputCollectors) {
071                                    sb.append(c.size()+" ");
072                            }
073                            logger.fine(sb.toString());
074                    }
075                    if (isValidInput(index, input)) {
076                            if (inputCollectors[index].add(input)) { // Join only if successfully added - for sets.
077                                    T[] inputs = (T[]) Array.newInstance(inputType, inputCollectors.length);
078                                    inputs[index] = input;
079                                    InputConsumer consumer = new InputConsumer(inputCollectors.length);
080                                    R ret = join(inputs, 0, index, context, consumer);
081                                    if (consumer.consumeFlags[index]) { // Current input was consumed.
082                                            inputCollectors[index].remove(input);
083                                    }
084                                    return ret;
085                            }
086                    }
087                    
088                    return null;
089            }
090            
091            private R join(T[] inputs, int currentIndex, int inputIndex, C context, InputConsumer consumer) throws Exception {
092                    if (currentIndex>0 && !partialJoin(inputs, currentIndex-1)) {
093                            return null;
094                    }
095                    
096                    if (currentIndex==inputs.length) {
097                            for (int i=0; i<consumer.consumeFlags.length; ++i) {
098                                    consumer.consumeFlags[i] = false;
099                            }
100                            return join(inputs, context, consumer, inputIndex);
101                    }
102                    
103                    if (currentIndex==inputIndex) {
104                            return join(inputs, currentIndex+1, inputIndex, context, consumer);
105                    }
106                    
107                    if (outerJoin) {
108                            inputs[currentIndex]=null;
109                            R ret = join(inputs, currentIndex+1, inputIndex, context, consumer);
110                            if (consumer.consumeFlags[inputIndex]) { // Input was consumed
111                                    return ret;
112                            }
113                            for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
114                                    if (consumer.consumeFlags[i]) {
115                                            return ret;
116                                    }
117                            }
118                    }
119                    
120                    Iterator<T> it = inputCollectors[currentIndex].iterator();
121                    while (it.hasNext()) {
122                            inputs[currentIndex]=it.next();
123                            if (isValidInput(currentIndex, inputs[currentIndex])) {
124                                    R ret = join(inputs, currentIndex+1, inputIndex, context, consumer);
125                                    if (consumer.consumeFlags[currentIndex]) { // This input was consumed.
126                                            it.remove();                                    
127                                    }
128                                    if (consumer.consumeFlags[inputIndex]) { // Input was consumed
129                                            return ret;
130                                    }
131                                    for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
132                                            if (consumer.consumeFlags[i]) {
133                                                    return ret;
134                                            }
135                                    }
136                            }
137                    }
138                    
139                    return null;
140            }
141            
142            /**
143             * This method is invoked for every combination of valid inputs.
144             * @param inputs Inputs
145             * @return Array of consumed flags. The array shall be the same size as inputs or null.
146             * If it is not null, each input for which the array contains true is considered to be
147             * "consumed", i.e. it gets removed from internal collections and doesn't participate
148             * in further joins. 
149             * @throws Exception
150             */
151            protected abstract R join(T[] inputs, C context, InputConsumer consumer, int activator) throws Exception;
152            
153            /**
154             * This method is invoked for all inputs before joining. It is useful for situation when 
155             * inputs in internal collections may become invalid over the course of life of joiner.
156             * @param index
157             * @param input
158             * @return
159             */
160            protected abstract boolean isValidInput(int index, T input);
161            
162            /**
163             * This method is invoked to validate already joined inputs.
164             * If this method returns false, further joining is abandoned.
165             * @param inputs Inputs array.
166             * @param index Index of last already joined input.
167             * @return
168             */
169            protected abstract boolean partialJoin(T[] inputs, int index) throws Exception;
170    
171    }