001package com.hammurapi.store; 002 003import java.io.Serializable; 004import java.util.ArrayList; 005import java.util.Collection; 006import java.util.Collections; 007import java.util.Comparator; 008import java.util.Iterator; 009import java.util.LinkedList; 010import java.util.List; 011import java.util.Map; 012import java.util.concurrent.Callable; 013import java.util.concurrent.ExecutorService; 014import java.util.concurrent.Future; 015import java.util.concurrent.locks.ReadWriteLock; 016 017import com.hammurapi.common.Util; 018import com.hammurapi.extract.CommutativeOr; 019import com.hammurapi.extract.ComparisonResult; 020import com.hammurapi.extract.ComparisonResult.Type; 021import com.hammurapi.extract.CompositePredicate; 022import com.hammurapi.extract.Extractor; 023import com.hammurapi.extract.IndexedExtractor; 024import com.hammurapi.extract.Predicate; 025import com.hammurapi.extract.True; 026 027/** 028 * Abstract base class for object stores. 029 * @author Pavel Vlasov 030 * 031 * @param <T> 032 * @param <S> Self-type - sub-type of store. Generic parameter for predicates, extractors, update tasks and views. 033 * S should be an interface for master store, deputy store and views to implement. 034 */ 035public abstract class AbstractStore<T, PK, S extends Store<T,PK,S>> implements Store<T, PK, S> { 036 037 /** 038 * @return Store for the mode with primary key. 039 */ 040 protected abstract Map<PK, Handle<T, PK, S>> getPkStore(); 041 042 /** 043 * Store for the mode without primary key or pkStore values collection. 044 */ 045 protected abstract Collection<Handle<T, PK, S>> getNoPkStore(); 046 047 /** 048 * @return Executor service to parallel processing. 049 */ 050 protected abstract ExecutorService getExecutorService(); 051 052 /** 053 * Creates handle. 054 */ 055 protected abstract Handle<T, PK, S> createHandle(T obj, PK primaryKey, Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache, Predicate<T, S>[] validators); 056 057 /** 058 * Creates collection for index entries 059 * @param unique Collection should be unique (set) 060 * @param ordered Collection should be ordered by the comparator or by natural order. 061 * @param comparator 062 * @return 063 */ 064 protected abstract Collection<Handle<T, PK, S>> createIndexCollection(boolean unique, boolean ordered, Comparator<T> comparator); 065 066 protected Handle<T, PK, S> createHandle(T obj, Predicate<T, S>[] validators) { 067 return createHandle(obj, null, createCache(), validators); 068 } 069 070 /** 071 * @return Cache for extractors. 072 */ 073 protected abstract Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> createCache(); 074 075 /** 076 * Creates a store which encapsulates current call state (e.g. locks currently held) and can be 077 * passed to executor for asynchronous execution. Delegates operate on behalf of the master 078 * store. They don't acquire locks if the master store already holds them. 079 * If the master store holds read lock then attempt of a deputy to acquire write lock will result in 080 * StoreException. 081 * @return 082 */ 083 protected abstract S createDeputy(); 084 protected abstract ReadWriteLock createMasterLock(); 085 086 /** 087 * Invoked when some handles have been removed. 088 * This operation shall remove invalidated handles from store and index collections. 089 * This method is invoked outside of the write lock. 090 * @param removed Number of removed handles. 091 */ 092 protected abstract void onRemoved(int removed); 093 094 /* 095 * Implementation note - removal of objects is done by marking Handle as removed. Handles may or may not be removed from store and index collections 096 * upon marking them as removed. Therefore there should be a "garbage collection" mechanism for removal of invalid handles. 097 */ 098 099 @SuppressWarnings("unchecked") 100 @Override 101 public Handle<T, PK, S> put(T obj, Predicate<T, S>... validators) { 102 // Evaluate validators. 103 for (Predicate<T, S> v: validators) { 104 if (!v.extract((S) this, null, Util.wrap(obj))) { 105 return null; 106 } 107 } 108 writeLock().lock(); 109 try { 110 if (getPrimaryKeyExtractor()==null) { 111 Handle<T, PK, S> ret = createHandle(obj, validators); 112 if (getNoPkStore().add(ret)) { 113 updateIndices(ret, true); 114 } else { 115 // Not unique 116 for (Handle<T, PK, S> hi: getNoPkStore()) { 117 if (ret.equals(hi)) { 118 return hi; 119 } 120 } 121 } 122 return ret; 123 } else { 124 Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache = createCache(); 125 PK primaryKey = getPrimaryKeyExtractor().extract((S) this, cache, Util.wrap(obj)); 126 Handle<T, PK, S> hi = getPkStore().get(primaryKey); 127 if (hi==null) { 128 hi = createHandle(obj, primaryKey, cache, validators); 129 updateIndices(hi, true); 130 getPkStore().put(primaryKey, hi); 131 } else { 132 hi.update(obj); 133 } 134 return hi; 135 } 136 } finally { 137 writeLock().unlock(); 138 } 139 } 140 141 @Override 142 public Iterable<T> getAll() { 143 readLock().lock(); 144 try { 145 Collection<T> ret = new ArrayList<T>(); 146 for (Handle<T, PK, S> hi: getNoPkStore()) { 147 if (hi.isValid()) { 148 ret.add(hi.get()); 149 } 150 } 151 return ret; 152 } finally { 153 readLock().unlock(); 154 } 155 } 156 157 @Override 158 public T getByPrimaryKey(PK primaryKey) { 159 readLock().lock(); 160 try { 161 if (getPrimaryKeyExtractor()==null) { 162 throw new IllegalStateException("Object store is configured without primary key"); 163 } 164 Handle<T, PK, S> hi = getPkStore().get(primaryKey); 165 if (hi==null || !hi.isValid()) { 166 // TODO - get from the backing store, update indices 167 return null; 168 } 169 return hi.get(); 170 } finally { 171 readLock().unlock(); 172 } 173 } 174 175 @Override 176 public Iterable<T> get(Predicate<T, S> selector) { 177 Collection<T> ret = new ArrayList<T>(); 178 for (Handle<T, PK, S> h: getHandles(selector)) { 179 ret.add(h.get()); 180 } 181 return ret; 182 } 183 184 protected class SelectorTask implements Callable<Handle<T, PK, S>>, Serializable { 185 186 private Predicate<T, S> selector; 187 private Handle<T, PK, S> handle; 188 189 public SelectorTask(Handle<T, PK, S> handle, Predicate<T, S> selector) { 190 this.handle = handle; 191 this.selector = selector; 192 } 193 194 /** 195 * Returns object if predicate matches. 196 */ 197 @Override 198 public Handle<T, PK, S> call() throws Exception { 199 if (handle.isValid() && (selector==null || handle.extract(selector))) { 200 return handle; 201 } 202 return null; 203 } 204 205 } 206 207 /** 208 * Result of applying index optimizations to data manipulation parameters. 209 * @author Pavel Vlasov 210 * 211 */ 212 private class IndexizationResult { 213 private Iterable<Handle<T, PK, S>> source; 214 private Predicate<T, S> selector; 215 private boolean ordered; 216 217 public IndexizationResult( 218 Iterable<Handle<T, PK, S>> source, 219 Predicate<T, S> selector, 220 boolean ordered) { 221 super(); 222 this.source = source; 223 this.selector = selector; 224 this.ordered = ordered; 225 } 226 227 /** 228 * @return Data to iterate over for further refinement. 229 */ 230 public Iterable<Handle<T, PK, S>> getSource() { 231 return source; 232 } 233 234 /** 235 * @return Refined selector with parts addressed by indices already removed. 236 */ 237 public Predicate<T, S> getSelector() { 238 return selector; 239 } 240 241 /** 242 * @return true if source is already ordered as required. 243 */ 244 public boolean isOrdered() { 245 return ordered; 246 } 247 248 } 249 250 private <ST extends T> ComparisonResult.Type comparePredicates( 251 Predicate<T, S> selector, 252 AbstractIndex<T, ST,PK,?,S> index 253 ) { 254 255 if (selector==null) { 256 if (index.getPredicate()==null || True.getInstance().equals(index.getPredicate())) { 257 return Type.LESS_RESTRICTIVE; 258 } 259 return Type.NOT_EQUAL; 260 } 261 262 if (index.getPredicate()==null || True.getInstance().equals(index.getPredicate())) { 263 return Type.LESS_RESTRICTIVE; 264 } 265 266 ComparisonResult cr = index.getPredicate().compareTo(selector); 267 if (!cr.isOneToOneMapping()) { 268 return Type.NOT_EQUAL; 269 } 270 271 switch (cr.getType()) { 272 case EQUAL: 273 case LESS_RESTRICTIVE: 274 return cr.getType(); 275 default: 276 return Type.NOT_EQUAL; 277 } 278 } 279 280 /** 281 * @param <V> 282 * @param extractor 283 * @param comparator 284 * @return 285 */ 286 private <V, ST extends T> boolean compatibleOrder(Extractor<T, V, S> extractor, Comparator<V> comparator, AbstractIndex<T, ST, PK,V,S> index) { 287 if (index.isOrdered()) { 288 if ((extractor==null && index.getExtractor()==null) || extractor.equals(index.getExtractor())) { 289 if ((comparator==null && index.getComparator()==null) || comparator.equals(index.getComparator())) { 290 return true; 291 } 292 } 293 } 294 return false; 295 } 296 297 /** 298 * Apply indexing optimizations. 299 * @return 300 */ 301 private <V> IndexizationResult indexize( 302 Predicate<T, S> selector, 303 final Extractor<T, V, S> extractor, 304 final boolean ordered, 305 Comparator<V> comparator) { 306 307 class Candidate implements Comparable<Candidate> { 308 AbstractIndex<T, ? extends T,PK,?,S> index; 309 ComparisonResult.Type type; 310 private boolean offersOrdering; 311 312 public Candidate(AbstractIndex<T, ? extends T, PK, ?, S> index, Type type, boolean offersOrdering) { 313 super(); 314 this.index = index; 315 this.type = type; 316 this.offersOrdering = offersOrdering; 317 } 318 319 @Override 320 public int compareTo(Candidate o) { 321 322 // If ordered, indices with compatible order come first, if not, unordered come first 323 if (ordered) { 324 if (offersOrdering) { 325 if (!o.offersOrdering) { 326 return -1; 327 } 328 } else { 329 if (o.offersOrdering) { 330 return 1; 331 } 332 } 333 } else { 334 if (index.isOrdered()) { 335 if (!o.index.isOrdered()) { 336 return 1; 337 } 338 } else { 339 if (o.index.isOrdered()) { 340 return -1; 341 } 342 } 343 } 344 345 // Predicates equal to selector come first. 346 if (!o.type.equals(type)) { 347 return Type.EQUAL.equals(type) ? -1 : 1; 348 } 349 350 // Less restrictive predicates come after more restrictive. 351 if (index.getPredicate()==null) { 352 if (o.index.getPredicate()!=null) { 353 return 1; 354 } 355 } else { 356 if (o.index.getPredicate()==null) { 357 return -1; 358 } 359 360 ComparisonResult cr = index.getPredicate().compareTo(o.index.getPredicate()); 361 if (cr.isOneToOneMapping()) { 362 switch (cr.getType()) { 363 case LESS_RESTRICTIVE: 364 return 1; 365 case MORE_RESTRICTIVE: 366 return -1; 367 } 368 } 369 } 370 371 if (index.isUnique()) { 372 if (!o.index.isUnique()) { 373 return -1; 374 } 375 } else { 376 if (o.index.isUnique()) { 377 return 1; 378 } 379 } 380 381 // More costly index comes first 382 Extractor<? extends T, ?, S> indexExtractor = index.getExtractor(); 383 Extractor<? extends T, ?, S> otherExtractor = o.index.getExtractor(); 384 double indexCost = indexExtractor==null ? 0 : indexExtractor.getCost(); 385 double otherCost = otherExtractor==null ? 0 : otherExtractor.getCost(); 386 if (indexCost>0 && otherCost>0) { 387 if (indexCost*o.index.getGranularity()>otherCost*index.getGranularity()) { 388 return -1; 389 } 390 if (indexCost*o.index.getGranularity()<otherCost*index.getGranularity()) { 391 return -1; 392 } 393 } 394 395 int gDelta = o.index.getGranularity() - index.getGranularity(); 396 if (gDelta!=0) { 397 return gDelta; 398 } 399 400 // Apply synchronous first, lazy last 401 int typeCmp = type.compareTo(o.type); 402 if (typeCmp!=0) { 403 return typeCmp; 404 } 405 406 return hashCode() - o.hashCode(); 407 } 408 409 } 410 411 List<Candidate> candidates = new ArrayList<Candidate>(); 412 for (AbstractIndex<T, ? extends T, PK,?,S> candidate: getIndices()) { 413 ComparisonResult.Type type = comparePredicates(selector, candidate); 414 if (!Type.NOT_EQUAL.equals(type)) { 415 // Add no-predicate indexes only if they offer ordering. 416 @SuppressWarnings("unchecked") 417 boolean offersOrdering = ordered && compatibleOrder(extractor, comparator, (AbstractIndex<T,? extends T,PK,V,S>) candidate); 418 if (candidate.getPredicate()!=null || offersOrdering) { 419 candidates.add(new Candidate(candidate, type, offersOrdering)); 420 } 421 } 422 } 423 424 if (!candidates.isEmpty()) { 425 Collections.sort(candidates); 426 Candidate indexToUse = candidates.get(0); 427 if (Type.EQUAL.equals(indexToUse.type)) { 428 return new IndexizationResult(indexToUse.index.getHandles(), null, indexToUse.offersOrdering); 429 } else if (selector instanceof CompositePredicate) { 430 @SuppressWarnings("unchecked") 431 Predicate<T, S> newSelector = ((CompositePredicate<T, ?, S, ?>) selector).remove(indexToUse.index.getPredicate()); 432 return new IndexizationResult(indexToUse.index.getHandles(), newSelector, indexToUse.offersOrdering); 433 } 434 // Less restrictive, not composite 435 return new IndexizationResult(indexToUse.index.getHandles(), selector, indexToUse.offersOrdering); 436 } 437 438 /** 439 * Or selector - we can use multiple indices to select, the we have to sort anyway. 440 */ 441 if (selector instanceof CommutativeOr && ((CommutativeOr) selector).getParts().size()>1) { 442 Collection<Future<IndexizationResult>> indexizationResults = new LinkedList<Future<IndexizationResult>>(); 443 for (Predicate<T, S> part: ((CommutativeOr<T,S>) selector).getParts()) { 444 indexizationResults.add(getExecutorService().submit(new IndexizationTask<V>(part, extractor))); 445 } 446 447 Collection<Handle<T, PK, S>> ret = new LinkedList<Handle<T, PK, S>>(); 448 Collection<Future<Handle<T, PK, S>>> fret = new LinkedList<Future<Handle<T, PK, S>>>(); 449 450 for (Future<IndexizationResult> fir: indexizationResults) { 451 try { 452 IndexizationResult ir = fir.get(); 453 if (ir.getSelector()==null || True.getInstance().equals(ir.getSelector())) { 454 for (Handle<T, PK, S> h: ir.getSource()) { 455 ret.add(h); 456 } 457 } else { 458 for (Handle<T, PK, S> h: ir.getSource()) { 459 fret.add(getExecutorService().submit(new SelectorTask(h, ir.getSelector()))); 460 } 461 } 462 } catch (Exception e) { 463 throw new StoreException(e); 464 } 465 } 466 467 for (Future<Handle<T, PK, S>> fh: fret) { 468 try { 469 Handle<T, PK, S> h = fh.get(); 470 if (h!=null) { 471 ret.add(h); 472 } 473 } catch (Exception e) { 474 throw new StoreException(e); 475 } 476 } 477 478 return new IndexizationResult(ret, null, false); 479 } 480 481 482 return new IndexizationResult(getNoPkStore(), selector, false); 483 } 484 485 protected class IndexizationTask<V> implements Callable<IndexizationResult> { 486 487 private Predicate<T, S> selector; 488 private Extractor<T, V, S> extractor; 489 490 public IndexizationTask(Predicate<T, S> selector, Extractor<T, V, S> extractor) { 491 this.selector = selector; 492 this.extractor = extractor; 493 } 494 495 @Override 496 public IndexizationResult call() throws Exception { 497 return indexize(selector, extractor, false, null); 498 } 499 500 } 501 502 /** 503 * @param selector 504 * @return 505 */ 506 protected Iterable<Handle<T, PK, S>> getHandles(Predicate<T, S> selector) { 507 readLock().lock(); 508 try { 509 IndexizationResult ir = indexize(selector, null, false, null); 510 Collection<Future<Handle<T, PK, S>>> selectResults = new ArrayList<Future<Handle<T, PK, S>>>(); 511 for (Handle<T, PK, S> h: ir.getSource()) { 512 selectResults.add(getExecutorService().submit(new SelectorTask(h, ir.getSelector()))); 513 } 514 Collection<Handle<T, PK, S>> ret = new ArrayList<Handle<T, PK, S>>(); 515 for (Future<Handle<T, PK, S>> f: selectResults) { 516 try { 517 Handle<T, PK, S> result = f.get(); 518 if (result!=null) { 519 ret.add(result); 520 } 521 } catch (Exception e) { 522 throw new StoreException(e); 523 } 524 } 525 return ret; 526 } finally { 527 readLock().unlock(); 528 } 529 } 530 531 protected class ExtractorTask<V> implements Callable<V>, Serializable { 532 533 private Predicate<T, S> selector; 534 private Handle<T, PK, S> handle; 535 private Extractor<T, V, S> extractor; 536 537 public ExtractorTask(Handle<T, PK, S> handle, Predicate<T, S> selector, Extractor<T, V, S> extractor) { 538 this.handle = handle; 539 this.selector = selector; 540 this.extractor = extractor; 541 } 542 543 /** 544 * Returns object if predicate matches. 545 */ 546 @Override 547 public V call() throws Exception { 548 if (handle.isValid() && (selector==null || handle.extract(selector))) { 549 if (extractor==null) { 550 return ((V) handle.get()); 551 } 552 return handle.extract(extractor); 553 } 554 return null; 555 } 556 557 } 558 559 @Override 560 public <V> Iterable<V> get( 561 Predicate<T, S> selector, 562 Extractor<T, V, S> extractor, 563 boolean ordered, 564 Comparator<V> comparator) { 565 566 readLock().lock(); 567 try { 568 IndexizationResult ir = indexize(selector, extractor, ordered, comparator); 569 Collection<Future<V>> extractResults = new ArrayList<Future<V>>(); 570 for (Handle<T, PK, S> h: ir.getSource()) { 571 extractResults.add(getExecutorService().submit(new ExtractorTask<V>(h, ir.getSelector(), extractor))); 572 } 573 List<V> ret = new ArrayList<V>(); 574 for (Future<V> f: extractResults) { 575 try { 576 V result = f.get(); 577 if (result!=null) { 578 ret.add(result); 579 } 580 } catch (Exception e) { 581 throw new StoreException(e); 582 } 583 } 584 if (ordered && !ir.isOrdered()) { 585 if (comparator==null) { 586 // Trick. 587 Collections.sort(ret, new NaturalComparator<V>()); 588 } else { 589 Collections.sort(ret, comparator); 590 } 591 } 592 return ret; 593 } finally { 594 readLock().unlock(); 595 } 596 } 597 598 protected class ValueSelectorTask<V> implements Callable<V>, Serializable { 599 600 private Predicate<V, S> selector; 601 private V value; 602 603 public ValueSelectorTask(V value, Predicate<V, S> selector) { 604 this.value = value; 605 this.selector = selector; 606 } 607 608 /** 609 * Returns object if predicate matches. 610 */ 611 @Override 612 public V call() throws Exception { 613 if (selector.extract(createDeputy(), null, Util.wrap(value))) { 614 return value; 615 } 616 return null; 617 } 618 619 } 620 621 @Override 622 public <V> Iterable<V> getMultiple( 623 Predicate<T, S> selector, 624 Extractor<T, Iterable<V>, S> extractor, 625 Predicate<V, S> valueSelector, 626 boolean ordered, 627 Comparator<V> comparator) { 628 readLock().lock(); 629 try { 630// IndexizationResult ir = indexize(selector, extractor, ordered, comparator); 631 Collection<Future<Iterable<V>>> extractResults = new ArrayList<Future<Iterable<V>>>(); 632 for (Handle<T, PK, S> h: getNoPkStore()) { 633 extractResults.add(getExecutorService().submit(new ExtractorTask<Iterable<V>>(h, selector, extractor))); 634 } 635 List<V> ret = new ArrayList<V>(); 636 if (valueSelector==null) { 637 for (Future<Iterable<V>> f: extractResults) { 638 try { 639 Iterable<V> result = f.get(); 640 if (result!=null) { 641 for (V v: result) { 642 ret.add(v); 643 } 644 } 645 } catch (Exception e) { 646 throw new StoreException(e); 647 } 648 } 649 } else { 650 List<Future<V>> fRet = new LinkedList<Future<V>>(); 651 for (Future<Iterable<V>> f: extractResults) { 652 try { 653 Iterable<V> result = f.get(); 654 if (result!=null) { 655 for (V v: result) { 656 fRet.add(getExecutorService().submit(new ValueSelectorTask<V>(v, valueSelector))); 657 } 658 } 659 } catch (Exception e) { 660 throw new StoreException(e); 661 } 662 } 663 for (Future<V> f: fRet) { 664 try { 665 V result = f.get(); 666 if (result!=null) { 667 ret.add(result); 668 } 669 } catch (Exception e) { 670 throw new StoreException(e); 671 } 672 } 673 } 674 if (ordered) { 675 if (comparator==null) { 676 // Trick. 677 Collections.sort(ret, new NaturalComparator<V>()); 678 } else { 679 Collections.sort(ret, comparator); 680 } 681 } 682 return ret; 683 } finally { 684 readLock().unlock(); 685 } 686 } 687 688 @Override 689 public void clear() { 690 writeLock().lock(); 691 try { 692 // Marking handles as removed. 693 for (Handle<T, PK, S> h: getNoPkStore()) { 694 h.remove(); 695 } 696 // TODO - clear indices, remove from the backing store. 697 if (getPrimaryKeyExtractor()==null) { 698 getNoPkStore().clear(); 699 } else { 700 getPkStore().clear(); 701 } 702 } finally { 703 writeLock().unlock(); 704 } 705 } 706 707 @Override 708 public boolean remove(T obj) { 709 int cleanedUp = 0; 710 writeLock().lock(); 711 try { 712 Iterator<Handle<T, PK, S>> it = getNoPkStore().iterator(); 713 while (it.hasNext()) { 714 Handle<T, PK, S> next = it.next(); 715 if (obj.equals(next.get())) { 716 next.remove(); // Handles shall just invalidate self without modifying the containing collection. 717 it.remove(); 718 // TODO remove from indices, remove from the backing store. 719 return true; 720 } 721 // Clean up also directly removed (through handle.remove()) entries. 722 if (!next.isValid()) { 723 it.remove(); 724 ++cleanedUp; 725 } 726 } 727 return false; 728 } finally { 729 writeLock().unlock(); 730 onRemoved(-cleanedUp); 731 } 732 } 733 734 public boolean removeByPrimaryKey(PK primaryKey) { 735 writeLock().lock(); 736 try { 737 if (getPrimaryKeyExtractor()==null) { 738 throw new IllegalStateException("Object store is configured without primary key"); 739 } 740 Handle<T, PK, S> hi = getPkStore().remove(primaryKey); 741 if (hi==null || !hi.isValid()) { 742 // TODO - get from the backing store, update indices 743 return false; 744 } 745 hi.remove(); 746 return true; 747 } finally { 748 writeLock().unlock(); 749 } 750 751 }; 752 753 protected class RemoveTask implements Callable<Boolean>, Serializable { 754 755 private Predicate<T, S> selector; 756 private Handle<T, PK, S> handle; 757 758 public RemoveTask(Handle<T, PK, S> handle, Predicate<T, S> selector) { 759 this.handle = handle; 760 this.selector = selector; 761 } 762 763 /** 764 * Removes object and returns true if predicate matches. 765 */ 766 @Override 767 public Boolean call() throws Exception { 768 if (handle.isValid() && handle.extract(selector)) { 769 handle.remove(); // TODO - remove from the backing store, shall be handled by handle.remove(). 770 return true; 771 } 772 return false; 773 } 774 775 } 776 777 @Override 778 public int remove(Predicate<T, S> selector) { 779 int ret = 0; 780 writeLock().lock(); 781 try { 782 IndexizationResult ir = indexize(selector, null, false, null); 783 Collection<Future<Boolean>> removeResults = new ArrayList<Future<Boolean>>(); 784 for (Handle<T, PK, S> h: ir.getSource()) { 785 removeResults.add(getExecutorService().submit(new RemoveTask(h, ir.getSelector()))); 786 } 787 for (Future<Boolean> f: removeResults) { 788 try { 789 if (f.get()) { 790 ++ret; 791 } 792 } catch (Exception e) { 793 throw new StoreException(e); 794 } 795 } 796 return ret; 797 } finally { 798 writeLock().unlock(); 799 onRemoved(ret); 800 } 801 } 802 803 protected class UpdateCallable implements Callable<Boolean>, Serializable { 804 805 private Predicate<T, S> selector; 806 private Handle<T, PK, S> handle; 807 private UpdateTask<T, PK, S> updater; 808 private S storeDeputy; 809 810 public UpdateCallable(Handle<T, PK, S> handle, Predicate<T, S> selector, UpdateTask<T,PK,S> updater, S storeDeputy) { 811 this.handle = handle; 812 this.selector = selector; 813 this.updater = updater; 814 this.storeDeputy = storeDeputy; 815 } 816 817 /** 818 * Updates object and returns true if predicate matches. 819 */ 820 @Override 821 public Boolean call() throws Exception { 822 if (handle.isValid() && handle.extract(selector)) { 823 updater.execute(storeDeputy, handle); // TODO - update backing store. 824 return true; 825 } 826 return false; 827 } 828 829 } 830 831 @Override 832 public int update(Predicate<T, S> selector, UpdateTask<T, PK, S> updater) { 833 writeLock().lock(); 834 try { 835 IndexizationResult ir = indexize(selector, null, false, null); 836 Collection<Future<Boolean>> updateResults = new ArrayList<Future<Boolean>>(); 837 for (Handle<T, PK, S> h: ir.getSource()) { 838 updateResults.add(getExecutorService().submit(new UpdateCallable(h, ir.getSelector(), updater, createDeputy()))); 839 } 840 int ret = 0; 841 for (Future<Boolean> f: updateResults) { 842 try { 843 if (f.get()) { 844 ++ret; 845 } 846 } catch (Exception e) { 847 throw new StoreException(e); 848 } 849 } 850 return ret; 851 } finally { 852 writeLock().unlock(); 853 } 854 } 855 856 protected abstract Collection<AbstractIndex<T,? extends T,PK, ?, S>> getIndices(); 857 858 protected abstract <V, ST extends T> AbstractIndex<T,ST,PK,V,S> createIndex( 859 Predicate<T, S> predicate, 860 Extractor<ST, V, S> extractor, 861 Index.Type type, 862 boolean ordered, 863 Comparator<V> comparator); 864 865 @SuppressWarnings("unchecked") 866 @Override 867 public <V, ST extends T> Index<T,ST,PK,V,S> addIndex( 868 Predicate<T, S> predicate, 869 Extractor<ST, V, S> extractor, 870 Index.Type type, 871 boolean ordered, 872 Comparator<V> comparator) { 873 874 writeLock().lock(); 875 try { 876 for (AbstractIndex<T, ? extends T, PK, ?, S> index: getIndices()) { 877 if ((predicate==null && index.getPredicate()==null) || (predicate!=null && predicate.equals(index.getPredicate()))) { 878 if ((extractor == null && index.getExtractor()==null) || (extractor!=null && extractor.equals(index))) { 879 if (ordered) { 880 if (!index.isOrdered()) { 881 // Upgrade index to ordered 882 ((AbstractIndex<T, ST, PK,V,S>) index).upgrade(type, ordered, comparator); 883 return (Index<T, ST, PK, V, S>) index; 884 } else { 885 if ((index.getComparator()==null && comparator==null) || (comparator!=null && comparator.equals(index.getComparator()))) { 886 if (index.type.compareTo(type)>0) { 887 ((AbstractIndex<T,ST,PK,V,S>) index).upgrade(type, ordered, comparator); 888 } 889 return (Index<T, ST, PK, V, S>) index; 890 } 891 } 892 } else { 893 // Requested index not ordered, doesn't matter if existing index is ordered. 894 // Upgrade type if required. 895 if (index.type.compareTo(type)>0) { 896 ((AbstractIndex<T,ST,PK,V,S>) index).upgrade(type, ordered, comparator); 897 } 898 return (Index<T, ST, PK, V, S>) index; 899 } 900 } 901 } 902 } 903 AbstractIndex<T, ST, PK, V, S> ret = createIndex(predicate, extractor, type, ordered, comparator); 904 for (Handle<T, PK, S> hi: getNoPkStore()) { 905 if (hi.isValid()) { 906 ret.update(hi, true); 907 } 908 } 909 getIndices().add(ret); 910 return ret.createProxy(); 911 } finally { 912 writeLock().unlock(); 913 } 914 915 } 916 917 private final Extractor<T,T, S> SELF_EXTRACTOR = new IndexedExtractor<T, S>(0); 918 919 protected class QueryCallable<V> implements Callable<Boolean>, Serializable { 920 921 private Predicate<T, S> selector; 922 private Handle<T, PK, S> handle; 923 private QueryTask<V, PK, S> query; 924 private S storeDeputy; 925 private Extractor<T, V, S> extractor; 926 927 @SuppressWarnings("unchecked") 928 public QueryCallable( 929 Handle<T, PK, S> handle, 930 Predicate<T, S> selector, 931 Extractor<T, V, S> extractor, 932 QueryTask<V,PK, S> query, 933 S storeDeputy) { 934 this.handle = handle; 935 this.selector = selector; 936 this.query = query; 937 this.storeDeputy = storeDeputy.createUnmodifiableFacade(); 938 this.extractor = extractor; 939 if (this.extractor==null) { 940 this.extractor = (Extractor<T, V, S>) SELF_EXTRACTOR; 941 } 942 } 943 944 /** 945 * Updates object and returns true if predicate matches. 946 */ 947 @Override 948 public Boolean call() throws Exception { 949 if (handle.isValid() && (selector==null || handle.extract(selector))) { 950 query.execute(storeDeputy, handle.extract(extractor), handle.getPrimaryKey()); 951 return true; 952 } 953 return false; 954 } 955 956 } 957 958 @Override 959 public int queryAll(final QueryTask<T, PK, S> query) { 960 readLock().lock(); 961 try { 962 Collection<Future<Boolean>> queryResults = new ArrayList<Future<Boolean>>(); 963 for (Handle<T, PK, S> h: getNoPkStore()) { 964 queryResults.add(getExecutorService().submit(new QueryCallable<T>(h, null, null, query, createDeputy()))); 965 } 966 int ret = 0; 967 for (Future<Boolean> f: queryResults) { 968 try { 969 if (f.get()) { 970 ++ret; 971 } 972 } catch (Exception e) { 973 throw new StoreException(e); 974 } 975 } 976 return ret; 977 } finally { 978 readLock().unlock(); 979 } 980 } 981 982 @Override 983 public int query(Predicate<T, S> selector, QueryTask<T, PK, S> query) { 984 readLock().lock(); 985 try { 986 IndexizationResult ir = indexize(selector, null, false, null); 987 Collection<Future<Boolean>> queryResults = new ArrayList<Future<Boolean>>(); 988 for (Handle<T, PK, S> h: ir.getSource()) { 989 queryResults.add(getExecutorService().submit(new QueryCallable<T>(h, ir.getSelector(), null, query, createDeputy()))); 990 } 991 int ret = 0; 992 for (Future<Boolean> f: queryResults) { 993 try { 994 if (f.get()) { 995 ++ret; 996 } 997 } catch (Exception e) { 998 throw new StoreException(e); 999 } 1000 } 1001 1002 return ret; 1003 } finally { 1004 readLock().unlock(); 1005 } 1006 } 1007 1008 @Override 1009 public <V> int query( 1010 Predicate<T, S> selector, 1011 Extractor<T, V, S> extractor, 1012 QueryTask<V, PK, S> query) { 1013 1014 readLock().lock(); 1015 try { 1016 IndexizationResult ir = indexize(selector, null, false, null); 1017 Collection<Future<Boolean>> queryResults = new ArrayList<Future<Boolean>>(); 1018 for (Handle<T, PK, S> h: ir.getSource()) { 1019 queryResults.add(getExecutorService().submit(new QueryCallable<V>(h, ir.getSelector(), extractor, query, createDeputy()))); 1020 } 1021 int ret = 0; 1022 for (Future<Boolean> f: queryResults) { 1023 try { 1024 if (f.get()) { 1025 ++ret; 1026 } 1027 } catch (Exception e) { 1028 throw new StoreException(e); 1029 } 1030 } 1031 1032 return ret; 1033 } finally { 1034 readLock().unlock(); 1035 } 1036 } 1037 1038 protected class QueryMultipleCallable<V> implements Callable<Collection<Future<Integer>>>, Serializable { 1039 1040 private Predicate<T, S> selector; 1041 private Handle<T, PK, S> handle; 1042 private QueryTask<V, PK, S> query; 1043 private S storeDeputy; 1044 private Extractor<T, Iterable<V>, S> extractor; 1045 private Predicate<V, S> valueSelector; 1046 1047 public QueryMultipleCallable( 1048 Handle<T, PK, S> handle, 1049 Predicate<T, S> selector, 1050 Extractor<T, Iterable<V>, S> extractor, 1051 Predicate<V, S> valueSelector, 1052 QueryTask<V,PK,S> query, 1053 S storeDeputy) { 1054 this.handle = handle; 1055 this.selector = selector; 1056 this.query = query; 1057 this.storeDeputy = storeDeputy; 1058 this.extractor = extractor; 1059 this.valueSelector = valueSelector; 1060 } 1061 1062 /** 1063 * Updates object and returns true if predicate matches. 1064 */ 1065 @Override 1066 public Collection<Future<Integer>> call() throws Exception { 1067 if (handle.isValid() && (selector==null || handle.extract(selector))) { 1068 Collection<Future<Integer>> queryItemTasks = new LinkedList<Future<Integer>>(); 1069 for (V v: handle.extract(extractor)) { 1070 queryItemTasks.add(getExecutorService().submit(new QueryItemCallable<V>(v, valueSelector, handle.getPrimaryKey(), query, storeDeputy))); 1071 } 1072 return queryItemTasks; 1073 } 1074 return Collections.emptyList(); 1075 } 1076 1077 } 1078 1079 protected class QueryItemCallable<V> implements Callable<Integer> { 1080 1081 private V v; 1082 private PK primaryKey; 1083 private QueryTask<V, PK, S> query; 1084 private S storeDeputy; 1085 private Predicate<V, S> valueSelector; 1086 1087 public QueryItemCallable( 1088 V v, 1089 Predicate<V, S> valueSelector, 1090 PK primaryKey, QueryTask<V, PK, S> query, 1091 S storeDeputy) { 1092 this.v = v; 1093 this.valueSelector = valueSelector; 1094 this.primaryKey = primaryKey; 1095 this.query = query; 1096 this.storeDeputy = storeDeputy.createUnmodifiableFacade(); 1097 } 1098 1099 @Override 1100 public Integer call() throws Exception { 1101 if (valueSelector==null || valueSelector.extract(storeDeputy, null, Util.wrap(v))) { 1102 query.execute(storeDeputy, v, primaryKey); 1103 } 1104 return 1; 1105 } 1106 1107 } 1108 1109 @Override 1110 public <V> int queryMultiple( 1111 Predicate<T, S> selector, 1112 Extractor<T, Iterable<V>, S> extractor, 1113 Predicate<V, S> valueSelector, 1114 QueryTask<V, PK, S> query) { 1115 readLock().lock(); 1116 try { 1117 IndexizationResult ir = indexize(selector, null, false, null); 1118 Collection<Future<Collection<Future<Integer>>>> queryResults = new ArrayList<Future<Collection<Future<Integer>>>>(); 1119 for (Handle<T, PK, S> h: ir.getSource()) { 1120 queryResults.add(getExecutorService().submit(new QueryMultipleCallable<V>(h, ir.getSelector(), extractor, valueSelector, query, createDeputy()))); 1121 } 1122 Collection<Future<Integer>> itemResults = new LinkedList<Future<Integer>>(); 1123 for (Future<Collection<Future<Integer>>> f: queryResults) { 1124 try { 1125 itemResults.addAll(f.get()); 1126 } catch (Exception e) { 1127 throw new StoreException(e); 1128 } 1129 } 1130 1131 int ret=0; 1132 for (Future<Integer> f: itemResults) { 1133 try { 1134 ret+=f.get(); 1135 } catch (Exception e) { 1136 throw new StoreException(e); 1137 } 1138 } 1139 return ret; 1140 } finally { 1141 readLock().unlock(); 1142 } 1143 } 1144 1145 /** 1146 * This method is invoked by handle implementations. 1147 * It shall not be invoked by client code. 1148 * @param handle 1149 * @param isNew 1150 */ 1151 protected void updateIndices(Handle<T, PK, S> handle, boolean isNew) { 1152 Iterator<AbstractIndex<T, ? extends T, PK, ?, S>> it = getIndices().iterator(); 1153 while (it.hasNext()) { 1154 ((AbstractIndex<T, ? extends T, PK, ?, S>) it.next()).update(handle, isNew); 1155 } 1156 } 1157 1158 @Override 1159 public Iterator<T> iterator() { 1160 return getAll().iterator(); 1161 } 1162 1163 @Override 1164 public S createView( 1165 Predicate<T, S> selector, 1166 ViewType viewType) { 1167 1168 switch (viewType) { 1169 case LIVE: 1170 return createLiveView(selector); 1171 case SYNCHRONOUS: 1172 case ASYNCHRONOUS: 1173 case LAZY: 1174 default: 1175 throw new UnsupportedOperationException("View type is not supported: "+viewType); 1176 } 1177 } 1178 1179 protected abstract S createLiveView(Predicate<T, S> selector); 1180 1181// protected 1182 1183}