EMMA Coverage Report (generated Thu Jan 20 11:39:44 EST 2011)
[all classes][com.hammurapi.common]

COVERAGE SUMMARY FOR SOURCE FILE [Joiner.java]

nameclass, %method, %block, %line, %
Joiner.java100% (3/3)75%  (12/16)67%  (343/513)73%  (76.5/105)

COVERAGE BREAKDOWN BY CLASS AND METHOD

nameclass, %method, %block, %line, %
     
class Joiner$CollectionAdapter100% (1/1)50%  (3/6)43%  (15/35)56%  (5/9)
clear (): void 0%   (0/1)0%   (0/4)0%   (0/2)
remove (Object): boolean 0%   (0/1)0%   (0/5)0%   (0/1)
toString (): String 0%   (0/1)0%   (0/11)0%   (0/1)
Joiner$CollectionAdapter (Collection): void 100% (1/1)100% (6/6)100% (3/3)
add (Object): boolean 100% (1/1)100% (5/5)100% (1/1)
iterator (): Iterator 100% (1/1)100% (4/4)100% (1/1)
     
class Joiner100% (1/1)88%  (7/8)68%  (315/465)73%  (66.5/91)
reset (): void 0%   (0/1)0%   (0/20)0%   (0/3)
addInput (int, Object, Object): List 100% (1/1)55%  (76/138)66%  (15.7/24)
join (Object [], int, int, Object, Joiner$InputConsumer, Map, List): void 100% (1/1)75%  (159/211)72%  (28.8/40)
addPredicate (Predicate): void 100% (1/1)75%  (49/65)85%  (11/13)
<static initializer> 100% (1/1)100% (5/5)100% (2/2)
Joiner (Joiner$Collector [], Class, boolean): void 100% (1/1)100% (22/22)100% (7/7)
isValidInput (int, Object): boolean 100% (1/1)100% (2/2)100% (1/1)
partialJoin (Object [], int, Joiner$InputConsumer): boolean 100% (1/1)100% (2/2)100% (1/1)
     
class Joiner$InputConsumer100% (1/1)100% (2/2)100% (13/13)100% (5/5)
Joiner$InputConsumer (int): void 100% (1/1)100% (7/7)100% (3/3)
consume (int): void 100% (1/1)100% (6/6)100% (2/2)

1package com.hammurapi.common;
2 
3import java.lang.reflect.Array;
4import java.util.ArrayList;
5import java.util.Collection;
6import java.util.HashMap;
7import java.util.Iterator;
8import java.util.List;
9import java.util.Map;
10import java.util.logging.Level;
11import java.util.logging.Logger;
12 
13import com.hammurapi.common.Joiner.Collector.CollectorIterator;
14import com.hammurapi.extract.CommutativeAnd;
15import com.hammurapi.extract.Extractor;
16import com.hammurapi.extract.Predicate;
17 
18/**
19 * Helper class to join data from several inputs.
20 * @author Pavel Vlasov
21 * @param <T> Join element type.
22 * @param <C> Context type passed from addInput to join.
23 * @param <R> Return type passed from join to to addInput.
24 */
25public abstract class Joiner<T, C, R> {        
26        
27        /**
28         * Collector of objects to join. Implements a subset of collection 
29         * methods. 
30         * @author Pavel Vlasov
31         *
32         * @param <T>
33         */
34        public interface Collector<T> extends Iterable<T> {
35                
36                /**
37                 * Collector iterator. <code>iterator()</code> may return iterator
38                 * of this type for collectors which hold resource, e.g. files or
39                 * database cursors.
40                 * @author Pavel Vlasov
41                 *
42                 * @param <T>
43                 */
44                interface CollectorIterator<T> extends Iterator<T> {
45                        
46                        /**
47                         * Close iterator to release resources.
48                         */
49                        void close();
50                        
51                }
52                
53                /**
54                 * Adds new object to the collector.
55                 * @param obj
56                 * @return true if object was added to the collector.
57                 */
58                boolean add(T obj);
59                
60                /**
61                 * Removes object (last entry) from the collector.
62                 * @param obj
63                 * @return true if success
64                 */
65                boolean remove(T obj);
66                
67                /**
68                 * Removes all object from the collector.
69                 */
70                void clear();
71                
72        }
73        
74        /**
75         * Adapter for collections.
76         * @author Pavel Vlasov
77         *
78         * @param <T>
79         */
80        public static class CollectionAdapter<T> implements Collector<T> {
81                
82                private Collection<T> master;
83                
84                public CollectionAdapter(Collection<T> master) {
85                        this.master = master;
86                }
87 
88                public boolean add(T obj) {
89                        return master.add(obj);
90                }
91 
92                public void clear() {
93                        master.clear();                        
94                }
95 
96                public boolean remove(T obj) {
97                        return master.remove(obj);                        
98                }
99 
100                public Iterator<T> iterator() {
101                        return master.iterator();
102                }
103 
104                @Override
105                public String toString() {
106                        return "CollectionAdapter [master=" + master + "]";
107                }
108                
109        }
110        
111        private static final Logger logger = Logger.getLogger(Joiner.class.getName());
112        
113        /**
114         * Interface to consume inputs in join() method.
115         * @author Pavel Vlasov
116         */
117        public static class InputConsumer {
118                
119                boolean[] consumeFlags;
120 
121                InputConsumer(int size) {
122                        consumeFlags = new boolean[size];
123                }
124                
125                /**
126                 * Consumes input at given index.
127                 * @param index
128                 */
129                public void consume(int index) {
130                        consumeFlags[index] = true;
131                }
132        }
133        
134        private Collector<T>[] inputCollectors;
135        private boolean outerJoin;
136        private Class<T> inputType;
137        private boolean isFine;
138        
139        /**
140         * Predicates to be fired at a particular index.
141         */
142        private Map<Integer, List<Predicate<T, C>>> predicates = new HashMap<Integer, List<Predicate<T, C>>>();
143        
144        /**
145         * Adds join predicate.
146         * @param predicate
147         */
148        public void addPredicate(Predicate<T, C> predicate) {
149                if (predicate instanceof CommutativeAnd) {
150                        // Break down the predicate.
151                        for (Predicate<T,C> part: ((CommutativeAnd<T,C>) predicate).getParts()) {
152                                addPredicate(part);
153                        }
154                
155                }
156                
157                int maxIndex = 0;
158                for (Integer idx: predicate.parameterIndices()) {
159                        if (idx>maxIndex) {
160                                maxIndex = idx;
161                        }
162                }
163                List<Predicate<T, C>> pl = predicates.get(maxIndex);
164                if (pl==null) {
165                        pl = new ArrayList<Predicate<T, C>>();
166                        predicates.put(maxIndex, pl);
167                }
168                pl.add(predicate);
169        }
170 
171        /**
172         * Creates joiner.
173         * @param inputCollectors Input collectors.
174         * @param inputType Input type is required because of erasure of generics and T is needed at runtime.
175         * @param outerJoin If true, each input addition triggers invocation of join, even if other input collectors
176         * don't have any data.
177         */
178        protected Joiner(Collector<T>[] inputCollectors, Class<T> inputType, boolean outerJoin) {
179                this.inputCollectors = inputCollectors;
180                this.outerJoin = outerJoin;
181                this.inputType = inputType;
182                this.isFine = logger.isLoggable(Level.FINE);
183        }
184        
185        /**
186         * Start join is invoked before join. Typically this method shall
187         * obtain a lock on input collectors.
188         */
189        protected abstract void startJoin();
190        
191        /**
192         * This method is invoked after join is finished. Typically this method
193         * shall release the lock on input collectors.
194         */
195        protected abstract void endJoin();
196        
197        /**
198         * Adds input. Input is joined with other accumulated inputs.
199         *  
200         * @param index Input index. 
201         * @param input Input.
202         */
203        @SuppressWarnings("unchecked")
204        public List<R> addInput(int index, T input, C context) throws Exception {
205                startJoin();
206                try {
207                        if (isFine) {
208                                logger.fine("New join input: "+input+" at index "+index);
209                                StringBuilder sb = new StringBuilder();
210                                sb.append("Input collectors: ");
211                                for (Collector<T> c:inputCollectors) {
212                                        sb.append(c+" ");
213                                }
214                                logger.fine(sb.toString());
215                        }
216                        if (isValidInput(index, input)) {
217                                if (inputCollectors[index].add(input)) { // Join only if successfully added - for sets.
218                                        T[] inputs = (T[]) Array.newInstance(inputType, inputCollectors.length);
219                                        inputs[index] = input;
220                                        InputConsumer consumer = new InputConsumer(inputCollectors.length);                        
221                                        List<R> ret = new ArrayList<R>();
222                                        Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache = new HashMap<C, Map<Extractor<T,? super Boolean,C>,? super Boolean>>(); 
223                                        join(inputs, 0, index, context, consumer, cache , ret);
224                                        if (consumer.consumeFlags[index]) { // Current input was consumed.
225                                                if (!inputCollectors[index].remove(input)) {
226                                                        throw new IllegalStateException("Cannot remove just added input");
227                                                }
228                                        }
229                                        return ret;
230                                }
231                        }
232                        
233                        return null;
234                } finally {
235                        endJoin();
236                }
237        }
238        
239        private void join(T[] inputs, int currentIndex, int inputIndex, C context, InputConsumer consumer, Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache, List<R> resultCollector) throws Exception {
240//                System.out.println(this+" Join "+currentIndex+" "+Arrays.toString(inputs));
241                if (currentIndex>0) {
242                        
243                        List<Predicate<T, C>> pl = predicates.get(currentIndex-1);
244                        if (pl!=null) {
245                                for (Predicate<T, C> predicate: pl) {
246                                        if (!predicate.extract(context, cache, inputs)) {
247                                                return;
248                                        }
249                                }
250                        }
251                        
252                        if (!partialJoin(inputs, currentIndex-1, consumer)) {
253                                return;
254                        }
255                }
256                
257                if (currentIndex==inputs.length) {
258                        for (int i=0; i<consumer.consumeFlags.length; ++i) {
259                                consumer.consumeFlags[i] = false;
260                        }
261                        resultCollector.add(join(inputs, context, consumer, inputIndex));
262                } else if (currentIndex==inputIndex) {
263                        join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector);
264                } else {                        
265                        if (outerJoin) {
266                                inputs[currentIndex]=null;
267                                join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector);
268                                if (consumer.consumeFlags[inputIndex]) { // Input was consumed
269                                        return;
270                                }
271                                for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
272                                        if (consumer.consumeFlags[i]) {
273                                                return;
274                                        }
275                                }
276                        }
277                        
278                        Iterator<T> it = inputCollectors[currentIndex].iterator();
279                        try {
280                                while (it.hasNext()) {
281                                        inputs[currentIndex]=it.next();
282                                        if (isValidInput(currentIndex, inputs[currentIndex])) {
283                                                join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector);
284                                                if (consumer.consumeFlags[currentIndex]) { // This input was consumed.
285                                                        it.remove();                                        
286                                                        return;
287                                                }
288                                                
289                                                if (consumer.consumeFlags[inputIndex]) { // Input was consumed
290                                                        return;
291                                                }
292                                                
293                                                for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here.
294                                                        if (consumer.consumeFlags[i]) {
295                                                                return;
296                                                        }
297                                                }
298                                        }
299                                }
300                        } finally {
301                                if (it instanceof CollectorIterator) {
302                                        ((CollectorIterator<?>) it).close();
303                                }
304                        }
305                }                
306        }
307        
308        /**
309         * This method is invoked for every combination of valid inputs.
310         * @param inputs Inputs
311         * @param context Join context passed from addInput
312         * @param consumer callback interface to consume inputs.
313         * @param activator Index of join activator.
314         * @return Join result. 
315         * @throws Exception
316         */
317        protected abstract R join(T[] inputs, C context, InputConsumer consumer, int activator) throws Exception;
318        
319        /**
320         * This method is invoked for all inputs before joining. It is useful for situation when 
321         * inputs in internal collections may become invalid over the course of life of joiner.
322         * @param index
323         * @param input
324         * @return
325         */
326        protected boolean isValidInput(int index, T input) {
327                return true;
328        }
329        
330        /**
331         * This method is invoked to validate already joined inputs.
332         * If this method returns false, further joining is abandoned.
333         * @param inputs Inputs array.
334         * @param index Index of last already joined input.
335         * @param consumer Callback interface to consume inputs. 
336         * @return
337         */
338        protected boolean partialJoin(T[] inputs, int index, InputConsumer consumer) throws Exception {
339                return true;
340        }
341        
342        /**
343         * Clears join collections.
344         */
345        public void reset() {
346                for (Collector<T> collector: inputCollectors) {
347                        collector.clear();
348                }
349        }
350 
351}

[all classes][com.hammurapi.common]
EMMA 2.0.5312 EclEmma Fix 2 (C) Vladimir Roubtsov