001package com.hammurapi.common; 002 003import java.lang.reflect.Array; 004import java.util.ArrayList; 005import java.util.Collection; 006import java.util.HashMap; 007import java.util.Iterator; 008import java.util.List; 009import java.util.Map; 010import java.util.concurrent.ConcurrentHashMap; 011import java.util.logging.Level; 012import java.util.logging.Logger; 013 014import com.hammurapi.extract.CommutativeAnd; 015import com.hammurapi.extract.Extractor; 016import com.hammurapi.extract.Predicate; 017 018/** 019 * Helper class to join data from several inputs. 020 * @author Pavel Vlasov 021 * @param <T> Join element type. 022 * @param <C> Context type passed from addInput to join. 023 * @param <R> Return type passed from join to to addInput. 024 */ 025public abstract class Joiner<T, C, R> { 026 027 /** 028 * Collector of objects to join. Implements a subset of collection 029 * methods. 030 * @author Pavel Vlasov 031 * 032 * @param <T> 033 */ 034 public interface Collector<T> extends Iterable<T> { 035 036 /** 037 * Adds new object to the collector. This method shall not break currently open iterators. 038 * @param obj 039 * @return true if object was added to the collector. 040 */ 041 boolean add(T obj); 042 043 /** 044 * Removes object (last entry) from the collector. 045 * @param obj 046 * @return true if success 047 */ 048 boolean remove(T obj); 049 050 /** 051 * Removes all object from the collector. 052 */ 053 void clear(); 054 055 } 056 057 /** 058 * Adapter for collections. 059 * @author Pavel Vlasov 060 * 061 * @param <T> 062 */ 063 public static class CollectionAdapter<T> implements Collector<T> { 064 065 private Collection<T> master; 066 067 public CollectionAdapter(Collection<T> master) { 068 this.master = master; 069 } 070 071 public boolean add(T obj) { 072 return master.add(obj); 073 } 074 075 public void clear() { 076 master.clear(); 077 } 078 079 public boolean remove(T obj) { 080 return master.remove(obj); 081 } 082 083 public Iterator<T> iterator() { 084 return master.iterator(); 085 } 086 087 @Override 088 public String toString() { 089 return "CollectionAdapter [master=" + master + "]"; 090 } 091 092 } 093 094 private static final Logger logger = Logger.getLogger(Joiner.class.getName()); 095 096 /** 097 * Interface to consume inputs in join() method. 098 * @author Pavel Vlasov 099 */ 100 public static class InputConsumer { 101 102 boolean[] consumeFlags; 103 104 InputConsumer(int size) { 105 consumeFlags = new boolean[size]; 106 } 107 108 /** 109 * Consumes input at given index. 110 * @param index 111 */ 112 public void consume(int index) { 113 consumeFlags[index] = true; 114 } 115 } 116 117 private Collector<T>[] inputCollectors; 118 private boolean outerJoin; 119 private Class<T> inputType; 120 private boolean isFine; 121 122 /** 123 * Predicates to be fired at a particular index. 124 */ 125 private Map<Integer, List<Predicate<T, C>>> predicates = new HashMap<Integer, List<Predicate<T, C>>>(); 126 127 /** 128 * Adds join predicate. 129 * @param predicate 130 */ 131 public void addPredicate(Predicate<T, C> predicate) { 132 if (predicate instanceof CommutativeAnd) { 133 // Break down the predicate. 134 for (Predicate<T,C> part: ((CommutativeAnd<T,C>) predicate).getParts()) { 135 addPredicate(part); 136 } 137 } else { 138 int maxIndex = 0; 139 for (Integer idx: predicate.parameterIndices()) { 140 if (idx>maxIndex) { 141 maxIndex = idx; 142 } 143 } 144 List<Predicate<T, C>> pl = predicates.get(maxIndex); 145 if (pl==null) { 146 pl = new ArrayList<Predicate<T, C>>(); 147 predicates.put(maxIndex, pl); 148 } 149 pl.add(predicate); 150 } 151 } 152 153 /** 154 * Creates joiner. 155 * @param inputCollectors Input collectors. 156 * @param inputType Input type is required because of erasure of generics and T is needed at runtime. 157 * @param outerJoin If true, each input addition triggers invocation of join, even if other input collectors 158 * don't have any data. 159 */ 160 protected Joiner(Collector<T>[] inputCollectors, Class<T> inputType, boolean outerJoin) { 161 this.inputCollectors = inputCollectors; 162 this.outerJoin = outerJoin; 163 this.inputType = inputType; 164 this.isFine = logger.isLoggable(Level.FINE); 165 } 166 167 /** 168 * Start join is invoked before join. Typically this method shall 169 * obtain a lock on input collectors. 170 */ 171 protected abstract void startJoin(); 172 173 /** 174 * This method is invoked after join is finished. Typically this method 175 * shall release the lock on input collectors. 176 */ 177 protected abstract void endJoin(); 178 179 /** 180 * Adds input. Input is joined with other accumulated inputs. 181 * 182 * @param index Input index. 183 * @param input Input. 184 */ 185 @SuppressWarnings("unchecked") 186 public List<R> addInput(int index, T input, C context) throws Exception { 187 if (isFine) { 188 logger.fine("New join input: "+input+" at index "+index); 189 StringBuilder sb = new StringBuilder(); 190 sb.append("Input collectors: "); 191 for (Collector<T> c:inputCollectors) { 192 sb.append(c+" "); 193 } 194 logger.fine(sb.toString()); 195 } 196 if (isValidInput(index, input)) { 197 startJoin(); 198 try { 199 if (inputCollectors[index].add(input)) { // Join only if successfully added - for sets. 200 T[] inputs = (T[]) Array.newInstance(inputType, inputCollectors.length); 201 inputs[index] = input; 202 InputConsumer consumer = new InputConsumer(inputCollectors.length); 203 List<R> ret = new ArrayList<R>(); 204 Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache = new ConcurrentHashMap<C, Map<Extractor<T,? super Boolean,C>,? super Boolean>>(); 205 join(inputs, 0, index, context, consumer, cache , ret); 206 if (consumer.consumeFlags[index]) { // Current input was consumed. 207 if (!inputCollectors[index].remove(input)) { 208 throw new IllegalStateException("Cannot remove just added input"); 209 } 210 } 211 return ret; 212 } 213 } finally { 214 endJoin(); 215 } 216 } 217 return null; 218 } 219 220 /** 221 * This method can be overridden to implement concurrent joining, i.e. at particular indices the sub-method wraps call to the super-method 222 * into a task executed in a separate thread. 223 * @param inputs 224 * @param currentIndex 225 * @param inputIndex 226 * @param context 227 * @param consumer 228 * @param cache 229 * @param resultCollector 230 * @throws Exception 231 */ 232 protected void join(T[] inputs, int currentIndex, int inputIndex, C context, InputConsumer consumer, Map<C, Map<Extractor<T, ? super Boolean, C>, ? super Boolean>> cache, List<R> resultCollector) throws Exception { 233// System.out.println(this+" Join "+currentIndex+" "+Arrays.toString(inputs)); 234 if (currentIndex<inputs.length) { 235 236 List<Predicate<T, C>> pl = predicates.get(currentIndex); 237 if (pl!=null) { 238 for (Predicate<T, C> predicate: pl) { 239 if (!predicate.extract(context, cache, inputs)) { 240 return; 241 } 242 } 243 } 244 245 if (!partialJoin(inputs, context, consumer, currentIndex, inputIndex)) { 246 return; 247 } 248 } 249 250 if (currentIndex==inputs.length) { 251 for (int i=0; i<consumer.consumeFlags.length; ++i) { 252 consumer.consumeFlags[i] = false; 253 } 254 R joinResult = join(inputs, context, consumer, inputIndex); 255 synchronized (resultCollector) { // For concurrent joins. 256 resultCollector.add(joinResult); 257 } 258 } else if (currentIndex==inputIndex) { 259 join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector); 260 } else { 261 if (outerJoin) { 262 inputs[currentIndex]=null; 263 join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector); 264 if (consumer.consumeFlags[inputIndex]) { // Input was consumed 265 return; 266 } 267 for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here. 268 if (consumer.consumeFlags[i]) { 269 return; 270 } 271 } 272 } 273 274 Iterator<T> it = inputCollectors[currentIndex].iterator(); 275 try { 276 while (it.hasNext()) { 277 inputs[currentIndex]=it.next(); 278 if (isValidInput(currentIndex, inputs[currentIndex])) { 279 join(inputs, currentIndex+1, inputIndex, context, consumer, cache, resultCollector); 280 if (consumer.consumeFlags[currentIndex]) { // This input was consumed. 281 it.remove(); 282 return; 283 } 284 285 if (consumer.consumeFlags[inputIndex]) { // Input was consumed 286 return; 287 } 288 289 for (int i=0; i<currentIndex; ++i) { // Lower index input was consumed - get out of here. 290 if (consumer.consumeFlags[i]) { 291 return; 292 } 293 } 294 } else { 295 if (consumer.consumeFlags[currentIndex]) { // This input is not valid anymore. 296 it.remove(); 297 return; 298 } 299 } 300 } 301 } finally { 302 if (it instanceof AutoCloseable) { 303 ((AutoCloseable) it).close(); 304 } 305 } 306 } 307 } 308 309 /** 310 * This method is invoked for every combination of valid inputs. 311 * @param inputs Inputs 312 * @param context Join context passed from addInput 313 * @param consumer callback interface to consume inputs. 314 * @param activator Index of join activator. 315 * @return Join result. 316 * @throws Exception 317 */ 318 protected abstract R join(T[] inputs, C context, InputConsumer consumer, int activator) throws Exception; 319 320 /** 321 * This method is invoked for all inputs before joining. It is useful for situation when 322 * inputs in internal collections may become invalid over the course of life of joiner. 323 * @param index 324 * @param input 325 * @return 326 */ 327 protected boolean isValidInput(int index, T input) { 328 return true; 329 } 330 331 /** 332 * This method is invoked to validate already joined inputs. 333 * If this method returns false, further joining is abandoned. 334 * @param inputs Inputs array. 335 * @param index Index of last already joined input. 336 * @param consumer Callback interface to consume inputs. 337 * @return 338 */ 339 protected boolean partialJoin(T[] inputs, C context, InputConsumer consumer, int index, int activator) throws Exception { 340 return true; 341 } 342 343 /** 344 * Clears join collections. 345 */ 346 public void reset() { 347 for (Collector<T> collector: inputCollectors) { 348 collector.clear(); 349 } 350 } 351 352}