1 | package com.hammurapi.store; |
2 | |
3 | import java.io.Serializable; |
4 | import java.util.ArrayList; |
5 | import java.util.Collection; |
6 | import java.util.Collections; |
7 | import java.util.Comparator; |
8 | import java.util.Iterator; |
9 | import java.util.LinkedList; |
10 | import java.util.List; |
11 | import java.util.Map; |
12 | import java.util.concurrent.Callable; |
13 | import java.util.concurrent.ExecutorService; |
14 | import java.util.concurrent.Future; |
15 | import java.util.concurrent.locks.ReadWriteLock; |
16 | |
17 | import com.hammurapi.common.Util; |
18 | import com.hammurapi.extract.CommutativeOr; |
19 | import com.hammurapi.extract.ComparisonResult; |
20 | import com.hammurapi.extract.ComparisonResult.Type; |
21 | import com.hammurapi.extract.CompositePredicate; |
22 | import com.hammurapi.extract.Extractor; |
23 | import com.hammurapi.extract.IndexedExtractor; |
24 | import com.hammurapi.extract.Predicate; |
25 | import com.hammurapi.extract.True; |
26 | |
27 | /** |
28 | * Abstract base class for object stores. |
29 | * @author Pavel Vlasov |
30 | * |
31 | * @param <T> |
32 | * @param <S> Self-type - sub-type of store. Generic parameter for predicates, extractors, update tasks and views. |
33 | * S should be an interface for master store, deputy store and views to implement. |
34 | */ |
35 | public abstract class AbstractStore<T, PK, S extends Store<T,PK,S>> implements Store<T, PK, S> { |
36 | |
37 | /** |
38 | * @return Store for the mode with primary key. |
39 | */ |
40 | protected abstract Map<PK, Handle<T, PK, S>> getPkStore(); |
41 | |
42 | /** |
43 | * Store for the mode without primary key or pkStore values collection. |
44 | */ |
45 | protected abstract Collection<Handle<T, PK, S>> getNoPkStore(); |
46 | |
47 | /** |
48 | * @return Executor service to parallel processing. |
49 | */ |
50 | protected abstract ExecutorService getExecutorService(); |
51 | |
52 | /** |
53 | * Creates handle. |
54 | */ |
55 | protected abstract Handle<T, PK, S> createHandle(T obj, PK primaryKey, Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache, Predicate<T, S>[] validators); |
56 | |
57 | /** |
58 | * Creates collection for index entries |
59 | * @param unique Collection should be unique (set) |
60 | * @param ordered Collection should be ordered by the comparator or by natural order. |
61 | * @param comparator |
62 | * @return |
63 | */ |
64 | protected abstract Collection<Handle<T, PK, S>> createIndexCollection(boolean unique, boolean ordered, Comparator<T> comparator); |
65 | |
66 | protected Handle<T, PK, S> createHandle(T obj, Predicate<T, S>[] validators) { |
67 | return createHandle(obj, null, createCache(), validators); |
68 | } |
69 | |
70 | /** |
71 | * @return Cache for extractors. |
72 | */ |
73 | protected abstract Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> createCache(); |
74 | |
75 | /** |
76 | * Creates a store which encapsulates current call state (e.g. locks currently held) and can be |
77 | * passed to executor for asynchronous execution. Delegates operate on behalf of the master |
78 | * store. They don't acquire locks if the master store already holds them. |
79 | * If the master store holds read lock then attempt of a deputy to acquire write lock will result in |
80 | * StoreException. |
81 | * @return |
82 | */ |
83 | protected abstract S createDeputy(); |
84 | protected abstract ReadWriteLock createMasterLock(); |
85 | |
86 | /** |
87 | * Invoked when some handles have been removed. |
88 | * This operation shall remove invalidated handles from store and index collections. |
89 | * This method is invoked outside of the write lock. |
90 | * @param removed Number of removed handles. |
91 | */ |
92 | protected abstract void onRemoved(int removed); |
93 | |
94 | /* |
95 | * Implementation note - removal of objects is done by marking Handle as removed. Handles may or may not be removed from store and index collections |
96 | * upon marking them as removed. Therefore there should be a "garbage collection" mechanism for removal of invalid handles. |
97 | */ |
98 | |
99 | @SuppressWarnings("unchecked") |
100 | @Override |
101 | public Handle<T, PK, S> put(T obj, Predicate<T, S>... validators) { |
102 | // Evaluate validators. |
103 | for (Predicate<T, S> v: validators) { |
104 | if (!v.extract((S) this, null, Util.wrap(obj))) { |
105 | return null; |
106 | } |
107 | } |
108 | writeLock().lock(); |
109 | try { |
110 | if (getPrimaryKeyExtractor()==null) { |
111 | Handle<T, PK, S> ret = createHandle(obj, validators); |
112 | if (getNoPkStore().add(ret)) { |
113 | updateIndices(ret, true); |
114 | } else { |
115 | // Not unique |
116 | for (Handle<T, PK, S> hi: getNoPkStore()) { |
117 | if (ret.equals(hi)) { |
118 | return hi; |
119 | } |
120 | } |
121 | } |
122 | return ret; |
123 | } else { |
124 | Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache = createCache(); |
125 | PK primaryKey = getPrimaryKeyExtractor().extract((S) this, cache, Util.wrap(obj)); |
126 | Handle<T, PK, S> hi = getPkStore().get(primaryKey); |
127 | if (hi==null) { |
128 | hi = createHandle(obj, primaryKey, cache, validators); |
129 | updateIndices(hi, true); |
130 | getPkStore().put(primaryKey, hi); |
131 | } else { |
132 | hi.update(obj); |
133 | } |
134 | return hi; |
135 | } |
136 | } finally { |
137 | writeLock().unlock(); |
138 | } |
139 | } |
140 | |
141 | @Override |
142 | public Iterable<T> getAll() { |
143 | readLock().lock(); |
144 | try { |
145 | Collection<T> ret = new ArrayList<T>(); |
146 | for (Handle<T, PK, S> hi: getNoPkStore()) { |
147 | if (hi.isValid()) { |
148 | ret.add(hi.get()); |
149 | } |
150 | } |
151 | return ret; |
152 | } finally { |
153 | readLock().unlock(); |
154 | } |
155 | } |
156 | |
157 | @Override |
158 | public T getByPrimaryKey(PK primaryKey) { |
159 | readLock().lock(); |
160 | try { |
161 | if (getPrimaryKeyExtractor()==null) { |
162 | throw new IllegalStateException("Object store is configured without primary key"); |
163 | } |
164 | Handle<T, PK, S> hi = getPkStore().get(primaryKey); |
165 | if (hi==null || !hi.isValid()) { |
166 | // TODO - get from the backing store, update indices |
167 | return null; |
168 | } |
169 | return hi.get(); |
170 | } finally { |
171 | readLock().unlock(); |
172 | } |
173 | } |
174 | |
175 | @Override |
176 | public Iterable<T> get(Predicate<T, S> selector) { |
177 | Collection<T> ret = new ArrayList<T>(); |
178 | for (Handle<T, PK, S> h: getHandles(selector)) { |
179 | ret.add(h.get()); |
180 | } |
181 | return ret; |
182 | } |
183 | |
184 | protected class SelectorTask implements Callable<Handle<T, PK, S>>, Serializable { |
185 | |
186 | private Predicate<T, S> selector; |
187 | private Handle<T, PK, S> handle; |
188 | |
189 | public SelectorTask(Handle<T, PK, S> handle, Predicate<T, S> selector) { |
190 | this.handle = handle; |
191 | this.selector = selector; |
192 | } |
193 | |
194 | /** |
195 | * Returns object if predicate matches. |
196 | */ |
197 | @Override |
198 | public Handle<T, PK, S> call() throws Exception { |
199 | if (handle.isValid() && (selector==null || handle.extract(selector))) { |
200 | return handle; |
201 | } |
202 | return null; |
203 | } |
204 | |
205 | } |
206 | |
207 | /** |
208 | * Result of applying index optimizations to data manipulation parameters. |
209 | * @author Pavel Vlasov |
210 | * |
211 | */ |
212 | private class IndexizationResult { |
213 | private Iterable<Handle<T, PK, S>> source; |
214 | private Predicate<T, S> selector; |
215 | private boolean ordered; |
216 | |
217 | public IndexizationResult( |
218 | Iterable<Handle<T, PK, S>> source, |
219 | Predicate<T, S> selector, |
220 | boolean ordered) { |
221 | super(); |
222 | this.source = source; |
223 | this.selector = selector; |
224 | this.ordered = ordered; |
225 | } |
226 | |
227 | /** |
228 | * @return Data to iterate over for further refinement. |
229 | */ |
230 | public Iterable<Handle<T, PK, S>> getSource() { |
231 | return source; |
232 | } |
233 | |
234 | /** |
235 | * @return Refined selector with parts addressed by indices already removed. |
236 | */ |
237 | public Predicate<T, S> getSelector() { |
238 | return selector; |
239 | } |
240 | |
241 | /** |
242 | * @return true if source is already ordered as required. |
243 | */ |
244 | public boolean isOrdered() { |
245 | return ordered; |
246 | } |
247 | |
248 | } |
249 | |
250 | private ComparisonResult.Type comparePredicates( |
251 | Predicate<T, S> selector, |
252 | AbstractIndex<T,PK,?,S> index |
253 | ) { |
254 | |
255 | if (selector==null) { |
256 | if (index.getPredicate()==null || True.getInstance().equals(index.getPredicate())) { |
257 | return Type.LESS_RESTRICTIVE; |
258 | } |
259 | return Type.NOT_EQUAL; |
260 | } |
261 | |
262 | if (index.getPredicate()==null || True.getInstance().equals(index.getPredicate())) { |
263 | return Type.LESS_RESTRICTIVE; |
264 | } |
265 | |
266 | ComparisonResult cr = index.getPredicate().compareTo(selector); |
267 | if (!cr.isOneToOneMapping()) { |
268 | return Type.NOT_EQUAL; |
269 | } |
270 | |
271 | switch (cr.getType()) { |
272 | case EQUAL: |
273 | case LESS_RESTRICTIVE: |
274 | return cr.getType(); |
275 | default: |
276 | return Type.NOT_EQUAL; |
277 | } |
278 | } |
279 | |
280 | /** |
281 | * @param <V> |
282 | * @param extractor |
283 | * @param comparator |
284 | * @return |
285 | */ |
286 | private <V> boolean compatibleOrder(Extractor<T, V, S> extractor, Comparator<V> comparator, AbstractIndex<T,PK,V,S> index) { |
287 | if (index.isOrdered()) { |
288 | if ((extractor==null && index.getExtractor()==null) || extractor.equals(index.getExtractor())) { |
289 | if ((comparator==null && index.getComparator()==null) || comparator.equals(index.getComparator())) { |
290 | return true; |
291 | } |
292 | } |
293 | } |
294 | return false; |
295 | } |
296 | |
297 | /** |
298 | * Apply indexing optimizations. |
299 | * @return |
300 | */ |
301 | private <V> IndexizationResult indexize( |
302 | Predicate<T, S> selector, |
303 | final Extractor<T, V, S> extractor, |
304 | final boolean ordered, |
305 | Comparator<V> comparator) { |
306 | |
307 | class Candidate implements Comparable<Candidate> { |
308 | AbstractIndex<T,PK,?,S> index; |
309 | ComparisonResult.Type type; |
310 | private boolean offersOrdering; |
311 | |
312 | public Candidate(AbstractIndex<T, PK, ?, S> index, Type type, boolean offersOrdering) { |
313 | super(); |
314 | this.index = index; |
315 | this.type = type; |
316 | this.offersOrdering = offersOrdering; |
317 | } |
318 | |
319 | @Override |
320 | public int compareTo(Candidate o) { |
321 | |
322 | // If ordered, indices with compatible order come first, if not, unordered come first |
323 | if (ordered) { |
324 | if (offersOrdering) { |
325 | if (!o.offersOrdering) { |
326 | return -1; |
327 | } |
328 | } else { |
329 | if (o.offersOrdering) { |
330 | return 1; |
331 | } |
332 | } |
333 | } else { |
334 | if (index.isOrdered()) { |
335 | if (!o.index.isOrdered()) { |
336 | return 1; |
337 | } |
338 | } else { |
339 | if (o.index.isOrdered()) { |
340 | return -1; |
341 | } |
342 | } |
343 | } |
344 | |
345 | // Predicates equal to selector come first. |
346 | if (!o.type.equals(type)) { |
347 | return Type.EQUAL.equals(type) ? -1 : 1; |
348 | } |
349 | |
350 | // Less restrictive predicates come after more restrictive. |
351 | if (index.getPredicate()==null) { |
352 | if (o.index.getPredicate()!=null) { |
353 | return 1; |
354 | } |
355 | } else { |
356 | if (o.index.getPredicate()==null) { |
357 | return -1; |
358 | } |
359 | |
360 | ComparisonResult cr = index.getPredicate().compareTo(o.index.getPredicate()); |
361 | if (cr.isOneToOneMapping()) { |
362 | switch (cr.getType()) { |
363 | case LESS_RESTRICTIVE: |
364 | return 1; |
365 | case MORE_RESTRICTIVE: |
366 | return -1; |
367 | } |
368 | } |
369 | } |
370 | |
371 | if (index.isUnique()) { |
372 | if (!o.index.isUnique()) { |
373 | return -1; |
374 | } |
375 | } else { |
376 | if (o.index.isUnique()) { |
377 | return 1; |
378 | } |
379 | } |
380 | |
381 | // More costly index comes first |
382 | Extractor<T, ?, S> indexExtractor = index.getExtractor(); |
383 | Extractor<T, ?, S> otherExtractor = o.index.getExtractor(); |
384 | double indexCost = indexExtractor==null ? 0 : indexExtractor.getCost(); |
385 | double otherCost = otherExtractor==null ? 0 : otherExtractor.getCost(); |
386 | if (indexCost>0 && otherCost>0) { |
387 | if (indexCost*o.index.getGranularity()>otherCost*index.getGranularity()) { |
388 | return -1; |
389 | } |
390 | if (indexCost*o.index.getGranularity()<otherCost*index.getGranularity()) { |
391 | return -1; |
392 | } |
393 | } |
394 | |
395 | int gDelta = o.index.getGranularity() - index.getGranularity(); |
396 | if (gDelta!=0) { |
397 | return gDelta; |
398 | } |
399 | |
400 | // Apply synchronous first, lazy last |
401 | int typeCmp = type.compareTo(o.type); |
402 | if (typeCmp!=0) { |
403 | return typeCmp; |
404 | } |
405 | |
406 | return hashCode() - o.hashCode(); |
407 | } |
408 | |
409 | } |
410 | |
411 | List<Candidate> candidates = new ArrayList<Candidate>(); |
412 | for (AbstractIndex<T,PK,?,S> candidate: getIndices()) { |
413 | ComparisonResult.Type type = comparePredicates(selector, candidate); |
414 | if (!Type.NOT_EQUAL.equals(type)) { |
415 | // Add no-predicate indexes only if they offer ordering. |
416 | @SuppressWarnings("unchecked") |
417 | boolean offersOrdering = ordered && compatibleOrder(extractor, comparator, (AbstractIndex<T,PK,V,S>) candidate); |
418 | if (candidate.getPredicate()!=null || offersOrdering) { |
419 | candidates.add(new Candidate(candidate, type, offersOrdering)); |
420 | } |
421 | } |
422 | } |
423 | |
424 | if (!candidates.isEmpty()) { |
425 | Collections.sort(candidates); |
426 | Candidate indexToUse = candidates.get(0); |
427 | if (Type.EQUAL.equals(indexToUse.type)) { |
428 | return new IndexizationResult(indexToUse.index.getHandles(), null, indexToUse.offersOrdering); |
429 | } else if (selector instanceof CompositePredicate) { |
430 | @SuppressWarnings("unchecked") |
431 | Predicate<T, S> newSelector = ((CompositePredicate<T, ?, S, ?>) selector).remove(indexToUse.index.getPredicate()); |
432 | return new IndexizationResult(indexToUse.index.getHandles(), newSelector, indexToUse.offersOrdering); |
433 | } |
434 | // Less restrictive, not composite |
435 | return new IndexizationResult(indexToUse.index.getHandles(), selector, indexToUse.offersOrdering); |
436 | } |
437 | |
438 | /** |
439 | * Or selector - we can use multiple indices to select, the we have to sort anyway. |
440 | */ |
441 | if (selector instanceof CommutativeOr && ((CommutativeOr) selector).getParts().size()>1) { |
442 | Collection<Future<IndexizationResult>> indexizationResults = new LinkedList<Future<IndexizationResult>>(); |
443 | for (Predicate<T, S> part: ((CommutativeOr<T,S>) selector).getParts()) { |
444 | indexizationResults.add(getExecutorService().submit(new IndexizationTask<V>(part, extractor))); |
445 | } |
446 | |
447 | Collection<Handle<T, PK, S>> ret = new LinkedList<Handle<T, PK, S>>(); |
448 | Collection<Future<Handle<T, PK, S>>> fret = new LinkedList<Future<Handle<T, PK, S>>>(); |
449 | |
450 | for (Future<IndexizationResult> fir: indexizationResults) { |
451 | try { |
452 | IndexizationResult ir = fir.get(); |
453 | if (ir.getSelector()==null || True.getInstance().equals(ir.getSelector())) { |
454 | for (Handle<T, PK, S> h: ir.getSource()) { |
455 | ret.add(h); |
456 | } |
457 | } else { |
458 | for (Handle<T, PK, S> h: ir.getSource()) { |
459 | fret.add(getExecutorService().submit(new SelectorTask(h, ir.getSelector()))); |
460 | } |
461 | } |
462 | } catch (Exception e) { |
463 | throw new StoreException(e); |
464 | } |
465 | } |
466 | |
467 | for (Future<Handle<T, PK, S>> fh: fret) { |
468 | try { |
469 | Handle<T, PK, S> h = fh.get(); |
470 | if (h!=null) { |
471 | ret.add(h); |
472 | } |
473 | } catch (Exception e) { |
474 | throw new StoreException(e); |
475 | } |
476 | } |
477 | |
478 | return new IndexizationResult(ret, null, false); |
479 | } |
480 | |
481 | |
482 | return new IndexizationResult(getNoPkStore(), selector, false); |
483 | } |
484 | |
485 | protected class IndexizationTask<V> implements Callable<IndexizationResult> { |
486 | |
487 | private Predicate<T, S> selector; |
488 | private Extractor<T, V, S> extractor; |
489 | |
490 | public IndexizationTask(Predicate<T, S> selector, Extractor<T, V, S> extractor) { |
491 | this.selector = selector; |
492 | this.extractor = extractor; |
493 | } |
494 | |
495 | @Override |
496 | public IndexizationResult call() throws Exception { |
497 | return indexize(selector, extractor, false, null); |
498 | } |
499 | |
500 | } |
501 | |
502 | /** |
503 | * @param selector |
504 | * @return |
505 | */ |
506 | protected Iterable<Handle<T, PK, S>> getHandles(Predicate<T, S> selector) { |
507 | readLock().lock(); |
508 | try { |
509 | IndexizationResult ir = indexize(selector, null, false, null); |
510 | Collection<Future<Handle<T, PK, S>>> selectResults = new ArrayList<Future<Handle<T, PK, S>>>(); |
511 | for (Handle<T, PK, S> h: ir.getSource()) { |
512 | selectResults.add(getExecutorService().submit(new SelectorTask(h, ir.getSelector()))); |
513 | } |
514 | Collection<Handle<T, PK, S>> ret = new ArrayList<Handle<T, PK, S>>(); |
515 | for (Future<Handle<T, PK, S>> f: selectResults) { |
516 | try { |
517 | Handle<T, PK, S> result = f.get(); |
518 | if (result!=null) { |
519 | ret.add(result); |
520 | } |
521 | } catch (Exception e) { |
522 | throw new StoreException(e); |
523 | } |
524 | } |
525 | return ret; |
526 | } finally { |
527 | readLock().unlock(); |
528 | } |
529 | } |
530 | |
531 | protected class ExtractorTask<V> implements Callable<V>, Serializable { |
532 | |
533 | private Predicate<T, S> selector; |
534 | private Handle<T, PK, S> handle; |
535 | private Extractor<T, V, S> extractor; |
536 | |
537 | public ExtractorTask(Handle<T, PK, S> handle, Predicate<T, S> selector, Extractor<T, V, S> extractor) { |
538 | this.handle = handle; |
539 | this.selector = selector; |
540 | this.extractor = extractor; |
541 | } |
542 | |
543 | /** |
544 | * Returns object if predicate matches. |
545 | */ |
546 | @Override |
547 | public V call() throws Exception { |
548 | if (handle.isValid() && (selector==null || handle.extract(selector))) { |
549 | if (extractor==null) { |
550 | return ((V) handle.get()); |
551 | } |
552 | return handle.extract(extractor); |
553 | } |
554 | return null; |
555 | } |
556 | |
557 | } |
558 | |
559 | @Override |
560 | public <V> Iterable<V> get( |
561 | Predicate<T, S> selector, |
562 | Extractor<T, V, S> extractor, |
563 | boolean ordered, |
564 | Comparator<V> comparator) { |
565 | |
566 | readLock().lock(); |
567 | try { |
568 | IndexizationResult ir = indexize(selector, extractor, ordered, comparator); |
569 | Collection<Future<V>> extractResults = new ArrayList<Future<V>>(); |
570 | for (Handle<T, PK, S> h: ir.getSource()) { |
571 | extractResults.add(getExecutorService().submit(new ExtractorTask<V>(h, ir.getSelector(), extractor))); |
572 | } |
573 | List<V> ret = new ArrayList<V>(); |
574 | for (Future<V> f: extractResults) { |
575 | try { |
576 | V result = f.get(); |
577 | if (result!=null) { |
578 | ret.add(result); |
579 | } |
580 | } catch (Exception e) { |
581 | throw new StoreException(e); |
582 | } |
583 | } |
584 | if (ordered && !ir.isOrdered()) { |
585 | if (comparator==null) { |
586 | // Trick. |
587 | Collections.sort(ret, new NaturalComparator<V>()); |
588 | } else { |
589 | Collections.sort(ret, comparator); |
590 | } |
591 | } |
592 | return ret; |
593 | } finally { |
594 | readLock().unlock(); |
595 | } |
596 | } |
597 | |
598 | protected class ValueSelectorTask<V> implements Callable<V>, Serializable { |
599 | |
600 | private Predicate<V, S> selector; |
601 | private V value; |
602 | |
603 | public ValueSelectorTask(V value, Predicate<V, S> selector) { |
604 | this.value = value; |
605 | this.selector = selector; |
606 | } |
607 | |
608 | /** |
609 | * Returns object if predicate matches. |
610 | */ |
611 | @Override |
612 | public V call() throws Exception { |
613 | if (selector.extract(createDeputy(), null, Util.wrap(value))) { |
614 | return value; |
615 | } |
616 | return null; |
617 | } |
618 | |
619 | } |
620 | |
621 | @Override |
622 | public <V> Iterable<V> getMultiple( |
623 | Predicate<T, S> selector, |
624 | Extractor<T, Iterable<V>, S> extractor, |
625 | Predicate<V, S> valueSelector, |
626 | boolean ordered, |
627 | Comparator<V> comparator) { |
628 | readLock().lock(); |
629 | try { |
630 | // IndexizationResult ir = indexize(selector, extractor, ordered, comparator); |
631 | Collection<Future<Iterable<V>>> extractResults = new ArrayList<Future<Iterable<V>>>(); |
632 | for (Handle<T, PK, S> h: getNoPkStore()) { |
633 | extractResults.add(getExecutorService().submit(new ExtractorTask<Iterable<V>>(h, selector, extractor))); |
634 | } |
635 | List<V> ret = new ArrayList<V>(); |
636 | if (valueSelector==null) { |
637 | for (Future<Iterable<V>> f: extractResults) { |
638 | try { |
639 | Iterable<V> result = f.get(); |
640 | if (result!=null) { |
641 | for (V v: result) { |
642 | ret.add(v); |
643 | } |
644 | } |
645 | } catch (Exception e) { |
646 | throw new StoreException(e); |
647 | } |
648 | } |
649 | } else { |
650 | List<Future<V>> fRet = new LinkedList<Future<V>>(); |
651 | for (Future<Iterable<V>> f: extractResults) { |
652 | try { |
653 | Iterable<V> result = f.get(); |
654 | if (result!=null) { |
655 | for (V v: result) { |
656 | fRet.add(getExecutorService().submit(new ValueSelectorTask<V>(v, valueSelector))); |
657 | } |
658 | } |
659 | } catch (Exception e) { |
660 | throw new StoreException(e); |
661 | } |
662 | } |
663 | for (Future<V> f: fRet) { |
664 | try { |
665 | V result = f.get(); |
666 | if (result!=null) { |
667 | ret.add(result); |
668 | } |
669 | } catch (Exception e) { |
670 | throw new StoreException(e); |
671 | } |
672 | } |
673 | } |
674 | if (ordered) { |
675 | if (comparator==null) { |
676 | // Trick. |
677 | Collections.sort(ret, new NaturalComparator<V>()); |
678 | } else { |
679 | Collections.sort(ret, comparator); |
680 | } |
681 | } |
682 | return ret; |
683 | } finally { |
684 | readLock().unlock(); |
685 | } |
686 | } |
687 | |
688 | @Override |
689 | public void clear() { |
690 | writeLock().lock(); |
691 | try { |
692 | // Marking handles as removed. |
693 | for (Handle<T, PK, S> h: getNoPkStore()) { |
694 | h.remove(); |
695 | } |
696 | // TODO - clear indices, remove from the backing store. |
697 | if (getPrimaryKeyExtractor()==null) { |
698 | getNoPkStore().clear(); |
699 | } else { |
700 | getPkStore().clear(); |
701 | } |
702 | } finally { |
703 | writeLock().unlock(); |
704 | } |
705 | } |
706 | |
707 | @Override |
708 | public boolean remove(T obj) { |
709 | int cleanedUp = 0; |
710 | writeLock().lock(); |
711 | try { |
712 | Iterator<Handle<T, PK, S>> it = getNoPkStore().iterator(); |
713 | while (it.hasNext()) { |
714 | Handle<T, PK, S> next = it.next(); |
715 | if (obj.equals(next.get())) { |
716 | next.remove(); // Handles shall just invalidate self without modifying the containing collection. |
717 | it.remove(); |
718 | // TODO remove from indices, remove from the backing store. |
719 | return true; |
720 | } |
721 | // Clean up also directly removed (through handle.remove()) entries. |
722 | if (!next.isValid()) { |
723 | it.remove(); |
724 | ++cleanedUp; |
725 | } |
726 | } |
727 | return false; |
728 | } finally { |
729 | writeLock().unlock(); |
730 | onRemoved(-cleanedUp); |
731 | } |
732 | } |
733 | |
734 | public boolean removeByPrimaryKey(PK primaryKey) { |
735 | writeLock().lock(); |
736 | try { |
737 | if (getPrimaryKeyExtractor()==null) { |
738 | throw new IllegalStateException("Object store is configured without primary key"); |
739 | } |
740 | Handle<T, PK, S> hi = getPkStore().remove(primaryKey); |
741 | if (hi==null || !hi.isValid()) { |
742 | // TODO - get from the backing store, update indices |
743 | return false; |
744 | } |
745 | hi.remove(); |
746 | return true; |
747 | } finally { |
748 | writeLock().unlock(); |
749 | } |
750 | |
751 | }; |
752 | |
753 | protected class RemoveTask implements Callable<Boolean>, Serializable { |
754 | |
755 | private Predicate<T, S> selector; |
756 | private Handle<T, PK, S> handle; |
757 | |
758 | public RemoveTask(Handle<T, PK, S> handle, Predicate<T, S> selector) { |
759 | this.handle = handle; |
760 | this.selector = selector; |
761 | } |
762 | |
763 | /** |
764 | * Removes object and returns true if predicate matches. |
765 | */ |
766 | @Override |
767 | public Boolean call() throws Exception { |
768 | if (handle.isValid() && handle.extract(selector)) { |
769 | handle.remove(); // TODO - remove from the backing store, shall be handled by handle.remove(). |
770 | return true; |
771 | } |
772 | return false; |
773 | } |
774 | |
775 | } |
776 | |
777 | @Override |
778 | public int remove(Predicate<T, S> selector) { |
779 | int ret = 0; |
780 | writeLock().lock(); |
781 | try { |
782 | IndexizationResult ir = indexize(selector, null, false, null); |
783 | Collection<Future<Boolean>> removeResults = new ArrayList<Future<Boolean>>(); |
784 | for (Handle<T, PK, S> h: ir.getSource()) { |
785 | removeResults.add(getExecutorService().submit(new RemoveTask(h, ir.getSelector()))); |
786 | } |
787 | for (Future<Boolean> f: removeResults) { |
788 | try { |
789 | if (f.get()) { |
790 | ++ret; |
791 | } |
792 | } catch (Exception e) { |
793 | throw new StoreException(e); |
794 | } |
795 | } |
796 | return ret; |
797 | } finally { |
798 | writeLock().unlock(); |
799 | onRemoved(ret); |
800 | } |
801 | } |
802 | |
803 | protected class UpdateCallable implements Callable<Boolean>, Serializable { |
804 | |
805 | private Predicate<T, S> selector; |
806 | private Handle<T, PK, S> handle; |
807 | private UpdateTask<T, PK, S> updater; |
808 | private S storeDeputy; |
809 | |
810 | public UpdateCallable(Handle<T, PK, S> handle, Predicate<T, S> selector, UpdateTask<T,PK,S> updater, S storeDeputy) { |
811 | this.handle = handle; |
812 | this.selector = selector; |
813 | this.updater = updater; |
814 | this.storeDeputy = storeDeputy; |
815 | } |
816 | |
817 | /** |
818 | * Updates object and returns true if predicate matches. |
819 | */ |
820 | @Override |
821 | public Boolean call() throws Exception { |
822 | if (handle.isValid() && handle.extract(selector)) { |
823 | updater.execute(storeDeputy, handle); // TODO - update backing store. |
824 | return true; |
825 | } |
826 | return false; |
827 | } |
828 | |
829 | } |
830 | |
831 | @Override |
832 | public int update(Predicate<T, S> selector, UpdateTask<T, PK, S> updater) { |
833 | writeLock().lock(); |
834 | try { |
835 | IndexizationResult ir = indexize(selector, null, false, null); |
836 | Collection<Future<Boolean>> updateResults = new ArrayList<Future<Boolean>>(); |
837 | for (Handle<T, PK, S> h: ir.getSource()) { |
838 | updateResults.add(getExecutorService().submit(new UpdateCallable(h, ir.getSelector(), updater, createDeputy()))); |
839 | } |
840 | int ret = 0; |
841 | for (Future<Boolean> f: updateResults) { |
842 | try { |
843 | if (f.get()) { |
844 | ++ret; |
845 | } |
846 | } catch (Exception e) { |
847 | throw new StoreException(e); |
848 | } |
849 | } |
850 | return ret; |
851 | } finally { |
852 | writeLock().unlock(); |
853 | } |
854 | } |
855 | |
856 | protected abstract Collection<AbstractIndex<T,PK, ?, S>> getIndices(); |
857 | |
858 | protected abstract <V> AbstractIndex<T,PK,V,S> createIndex( |
859 | Predicate<T, S> predicate, |
860 | Extractor<T, V, S> extractor, |
861 | Index.Type type, |
862 | boolean ordered, |
863 | Comparator<V> comparator); |
864 | |
865 | @SuppressWarnings("unchecked") |
866 | @Override |
867 | public <V> Index<T,PK,V,S> addIndex( |
868 | Predicate<T, S> predicate, |
869 | Extractor<T, V, S> extractor, |
870 | Index.Type type, |
871 | boolean ordered, |
872 | Comparator<V> comparator) { |
873 | |
874 | writeLock().lock(); |
875 | try { |
876 | for (AbstractIndex<T, PK, ?, S> index: getIndices()) { |
877 | if ((predicate==null && index.getPredicate()==null) || (predicate!=null && predicate.equals(index.getPredicate()))) { |
878 | if ((extractor == null && index.getExtractor()==null) || (extractor!=null && extractor.equals(index))) { |
879 | if (ordered) { |
880 | if (!index.isOrdered()) { |
881 | // Upgrade index to ordered |
882 | ((AbstractIndex<T,PK,V,S>) index).upgrade(type, ordered, comparator); |
883 | return (Index<T, PK, V, S>) index; |
884 | } else { |
885 | if ((index.getComparator()==null && comparator==null) || (comparator!=null && comparator.equals(index.getComparator()))) { |
886 | if (index.type.compareTo(type)>0) { |
887 | ((AbstractIndex<T,PK,V,S>) index).upgrade(type, ordered, comparator); |
888 | } |
889 | return (Index<T, PK, V, S>) index; |
890 | } |
891 | } |
892 | } else { |
893 | // Requested index not ordered, doesn't matter if existing index is ordered. |
894 | // Upgrade type if required. |
895 | if (index.type.compareTo(type)>0) { |
896 | ((AbstractIndex<T,PK,V,S>) index).upgrade(type, ordered, comparator); |
897 | } |
898 | return (Index<T, PK, V, S>) index; |
899 | } |
900 | } |
901 | } |
902 | } |
903 | AbstractIndex<T, PK, V, S> ret = createIndex(predicate, extractor, type, ordered, comparator); |
904 | for (Handle<T, PK, S> hi: getNoPkStore()) { |
905 | if (hi.isValid()) { |
906 | ret.update(hi, true); |
907 | } |
908 | } |
909 | getIndices().add(ret); |
910 | return ret.createProxy(); |
911 | } finally { |
912 | writeLock().unlock(); |
913 | } |
914 | |
915 | } |
916 | |
917 | private final Extractor<T,T, S> SELF_EXTRACTOR = new IndexedExtractor<T, S>(0); |
918 | |
919 | protected class QueryCallable<V> implements Callable<Boolean>, Serializable { |
920 | |
921 | private Predicate<T, S> selector; |
922 | private Handle<T, PK, S> handle; |
923 | private QueryTask<V, PK, S> query; |
924 | private S storeDeputy; |
925 | private Extractor<T, V, S> extractor; |
926 | |
927 | @SuppressWarnings("unchecked") |
928 | public QueryCallable( |
929 | Handle<T, PK, S> handle, |
930 | Predicate<T, S> selector, |
931 | Extractor<T, V, S> extractor, |
932 | QueryTask<V,PK, S> query, |
933 | S storeDeputy) { |
934 | this.handle = handle; |
935 | this.selector = selector; |
936 | this.query = query; |
937 | this.storeDeputy = storeDeputy.createUnmodifiableFacade(); |
938 | this.extractor = extractor; |
939 | if (this.extractor==null) { |
940 | this.extractor = (Extractor<T, V, S>) SELF_EXTRACTOR; |
941 | } |
942 | } |
943 | |
944 | /** |
945 | * Updates object and returns true if predicate matches. |
946 | */ |
947 | @Override |
948 | public Boolean call() throws Exception { |
949 | if (handle.isValid() && (selector==null || handle.extract(selector))) { |
950 | query.execute(storeDeputy, handle.extract(extractor), handle.getPrimaryKey()); |
951 | return true; |
952 | } |
953 | return false; |
954 | } |
955 | |
956 | } |
957 | |
958 | @Override |
959 | public int queryAll(final QueryTask<T, PK, S> query) { |
960 | readLock().lock(); |
961 | try { |
962 | Collection<Future<Boolean>> queryResults = new ArrayList<Future<Boolean>>(); |
963 | for (Handle<T, PK, S> h: getNoPkStore()) { |
964 | queryResults.add(getExecutorService().submit(new QueryCallable<T>(h, null, null, query, createDeputy()))); |
965 | } |
966 | int ret = 0; |
967 | for (Future<Boolean> f: queryResults) { |
968 | try { |
969 | if (f.get()) { |
970 | ++ret; |
971 | } |
972 | } catch (Exception e) { |
973 | throw new StoreException(e); |
974 | } |
975 | } |
976 | return ret; |
977 | } finally { |
978 | readLock().unlock(); |
979 | } |
980 | } |
981 | |
982 | @Override |
983 | public int query(Predicate<T, S> selector, QueryTask<T, PK, S> query) { |
984 | readLock().lock(); |
985 | try { |
986 | IndexizationResult ir = indexize(selector, null, false, null); |
987 | Collection<Future<Boolean>> queryResults = new ArrayList<Future<Boolean>>(); |
988 | for (Handle<T, PK, S> h: ir.getSource()) { |
989 | queryResults.add(getExecutorService().submit(new QueryCallable<T>(h, ir.getSelector(), null, query, createDeputy()))); |
990 | } |
991 | int ret = 0; |
992 | for (Future<Boolean> f: queryResults) { |
993 | try { |
994 | if (f.get()) { |
995 | ++ret; |
996 | } |
997 | } catch (Exception e) { |
998 | throw new StoreException(e); |
999 | } |
1000 | } |
1001 | |
1002 | return ret; |
1003 | } finally { |
1004 | readLock().unlock(); |
1005 | } |
1006 | } |
1007 | |
1008 | @Override |
1009 | public <V> int query( |
1010 | Predicate<T, S> selector, |
1011 | Extractor<T, V, S> extractor, |
1012 | QueryTask<V, PK, S> query) { |
1013 | |
1014 | readLock().lock(); |
1015 | try { |
1016 | IndexizationResult ir = indexize(selector, null, false, null); |
1017 | Collection<Future<Boolean>> queryResults = new ArrayList<Future<Boolean>>(); |
1018 | for (Handle<T, PK, S> h: ir.getSource()) { |
1019 | queryResults.add(getExecutorService().submit(new QueryCallable<V>(h, ir.getSelector(), extractor, query, createDeputy()))); |
1020 | } |
1021 | int ret = 0; |
1022 | for (Future<Boolean> f: queryResults) { |
1023 | try { |
1024 | if (f.get()) { |
1025 | ++ret; |
1026 | } |
1027 | } catch (Exception e) { |
1028 | throw new StoreException(e); |
1029 | } |
1030 | } |
1031 | |
1032 | return ret; |
1033 | } finally { |
1034 | readLock().unlock(); |
1035 | } |
1036 | } |
1037 | |
1038 | protected class QueryMultipleCallable<V> implements Callable<Collection<Future<Integer>>>, Serializable { |
1039 | |
1040 | private Predicate<T, S> selector; |
1041 | private Handle<T, PK, S> handle; |
1042 | private QueryTask<V, PK, S> query; |
1043 | private S storeDeputy; |
1044 | private Extractor<T, Iterable<V>, S> extractor; |
1045 | private Predicate<V, S> valueSelector; |
1046 | |
1047 | public QueryMultipleCallable( |
1048 | Handle<T, PK, S> handle, |
1049 | Predicate<T, S> selector, |
1050 | Extractor<T, Iterable<V>, S> extractor, |
1051 | Predicate<V, S> valueSelector, |
1052 | QueryTask<V,PK,S> query, |
1053 | S storeDeputy) { |
1054 | this.handle = handle; |
1055 | this.selector = selector; |
1056 | this.query = query; |
1057 | this.storeDeputy = storeDeputy; |
1058 | this.extractor = extractor; |
1059 | this.valueSelector = valueSelector; |
1060 | } |
1061 | |
1062 | /** |
1063 | * Updates object and returns true if predicate matches. |
1064 | */ |
1065 | @Override |
1066 | public Collection<Future<Integer>> call() throws Exception { |
1067 | if (handle.isValid() && (selector==null || handle.extract(selector))) { |
1068 | Collection<Future<Integer>> queryItemTasks = new LinkedList<Future<Integer>>(); |
1069 | for (V v: handle.extract(extractor)) { |
1070 | queryItemTasks.add(getExecutorService().submit(new QueryItemCallable<V>(v, valueSelector, handle.getPrimaryKey(), query, storeDeputy))); |
1071 | } |
1072 | return queryItemTasks; |
1073 | } |
1074 | return Collections.emptyList(); |
1075 | } |
1076 | |
1077 | } |
1078 | |
1079 | protected class QueryItemCallable<V> implements Callable<Integer> { |
1080 | |
1081 | private V v; |
1082 | private PK primaryKey; |
1083 | private QueryTask<V, PK, S> query; |
1084 | private S storeDeputy; |
1085 | private Predicate<V, S> valueSelector; |
1086 | |
1087 | public QueryItemCallable( |
1088 | V v, |
1089 | Predicate<V, S> valueSelector, |
1090 | PK primaryKey, QueryTask<V, PK, S> query, |
1091 | S storeDeputy) { |
1092 | this.v = v; |
1093 | this.valueSelector = valueSelector; |
1094 | this.primaryKey = primaryKey; |
1095 | this.query = query; |
1096 | this.storeDeputy = storeDeputy.createUnmodifiableFacade(); |
1097 | } |
1098 | |
1099 | @Override |
1100 | public Integer call() throws Exception { |
1101 | if (valueSelector==null || valueSelector.extract(storeDeputy, null, Util.wrap(v))) { |
1102 | query.execute(storeDeputy, v, primaryKey); |
1103 | } |
1104 | return 1; |
1105 | } |
1106 | |
1107 | } |
1108 | |
1109 | @Override |
1110 | public <V> int queryMultiple( |
1111 | Predicate<T, S> selector, |
1112 | Extractor<T, Iterable<V>, S> extractor, |
1113 | Predicate<V, S> valueSelector, |
1114 | QueryTask<V, PK, S> query) { |
1115 | readLock().lock(); |
1116 | try { |
1117 | IndexizationResult ir = indexize(selector, null, false, null); |
1118 | Collection<Future<Collection<Future<Integer>>>> queryResults = new ArrayList<Future<Collection<Future<Integer>>>>(); |
1119 | for (Handle<T, PK, S> h: ir.getSource()) { |
1120 | queryResults.add(getExecutorService().submit(new QueryMultipleCallable<V>(h, ir.getSelector(), extractor, valueSelector, query, createDeputy()))); |
1121 | } |
1122 | Collection<Future<Integer>> itemResults = new LinkedList<Future<Integer>>(); |
1123 | for (Future<Collection<Future<Integer>>> f: queryResults) { |
1124 | try { |
1125 | itemResults.addAll(f.get()); |
1126 | } catch (Exception e) { |
1127 | throw new StoreException(e); |
1128 | } |
1129 | } |
1130 | |
1131 | int ret=0; |
1132 | for (Future<Integer> f: itemResults) { |
1133 | try { |
1134 | ret+=f.get(); |
1135 | } catch (Exception e) { |
1136 | throw new StoreException(e); |
1137 | } |
1138 | } |
1139 | return ret; |
1140 | } finally { |
1141 | readLock().unlock(); |
1142 | } |
1143 | } |
1144 | |
1145 | /** |
1146 | * This method is invoked by handle implementations. |
1147 | * It shall not be invoked by client code. |
1148 | * @param handle |
1149 | * @param isNew |
1150 | */ |
1151 | protected void updateIndices(Handle<T, PK, S> handle, boolean isNew) { |
1152 | Iterator<AbstractIndex<T, PK, ?, S>> it = getIndices().iterator(); |
1153 | while (it.hasNext()) { |
1154 | ((AbstractIndex<T, PK, ?, S>) it.next()).update(handle, isNew); |
1155 | } |
1156 | } |
1157 | |
1158 | @Override |
1159 | public Iterator<T> iterator() { |
1160 | return getAll().iterator(); |
1161 | } |
1162 | |
1163 | @Override |
1164 | public S createView( |
1165 | Predicate<T, S> selector, |
1166 | ViewType viewType) { |
1167 | |
1168 | switch (viewType) { |
1169 | case LIVE: |
1170 | return createLiveView(selector); |
1171 | case SYNCHRONOUS: |
1172 | case ASYNCHRONOUS: |
1173 | case LAZY: |
1174 | default: |
1175 | throw new UnsupportedOperationException("View type is not supported: "+viewType); |
1176 | } |
1177 | } |
1178 | |
1179 | protected abstract S createLiveView(Predicate<T, S> selector); |
1180 | |
1181 | // protected |
1182 | |
1183 | } |