001package com.hammurapi.store; 002 003import java.lang.reflect.InvocationHandler; 004import java.lang.reflect.Method; 005import java.lang.reflect.Proxy; 006import java.util.ArrayList; 007import java.util.Collection; 008import java.util.Collections; 009import java.util.Comparator; 010import java.util.Iterator; 011import java.util.LinkedList; 012import java.util.List; 013import java.util.Map; 014import java.util.concurrent.ExecutorService; 015import java.util.concurrent.atomic.AtomicBoolean; 016import java.util.concurrent.atomic.AtomicInteger; 017import java.util.concurrent.locks.Lock; 018 019import com.hammurapi.extract.Extractor; 020import com.hammurapi.extract.Predicate; 021import com.hammurapi.store.Store.Handle; 022 023public abstract class AbstractIndex<ST, T extends ST, PK, V, S extends Store<ST,PK,S>> implements OrderedIndex<ST, T, PK, V, S> { 024 025 private Predicate<ST, S> predicate; 026 private Extractor<T, V, S> extractor; 027 protected Index.Type type; 028 protected boolean ordered; 029 protected Comparator<V> comparator; 030 protected AbstractStore<ST, PK, S> store; 031 protected AtomicBoolean isCorrupt = new AtomicBoolean(false); 032 033 protected AbstractIndex( 034 Predicate<ST, S> predicate, 035 Extractor<T, V, S> extractor, 036 Index.Type type, 037 boolean ordered, 038 Comparator<V> comparator, 039 AbstractStore<ST,PK,S> store) { 040 this.predicate = predicate; 041 this.extractor = extractor; 042 this.type = type; 043 this.ordered = ordered; 044 this.comparator = comparator; 045 this.store = store; 046 } 047 048 /** 049 * @return Index unique store. If index is ordered, the index store 050 * shall be of type SortedMap. 051 */ 052 protected abstract Map<V, Handle<ST,PK,S>> getIndexUniqueStore(); 053 054 /** 055 * @return Index multi-store. If index is ordered, the index store 056 * shall be of type SortedMap. 057 */ 058 protected abstract Map<V, Collection<Handle<ST,PK,S>>> getIndexMultiStore(); 059 060 protected abstract Lock readLock(); 061 062 protected abstract Lock writeLock(); 063 064 /** 065 * Upgrades index type. 066 * @param type Asynchronous and lazy indices can be 067 * upgraded to synchronous. All index types can be upgraded 068 * to unique if current index data does not contain duplicates. 069 * @param ordered unordered index can be upgraded to ordered. 070 */ 071 public abstract void upgrade(Index.Type type, boolean ordered, Comparator<V> comparator); 072 073 Collection<Handle<ST,PK,S>> getHandles() { 074 checkCorrupt(); 075 applyUpdates(); 076 try { 077 Collection<Handle<ST,PK,S>> ret = new LinkedList<Handle<ST,PK,S>>(); 078 if (isUnique()) { 079 for (Handle<ST,PK,S> h: getIndexUniqueStore().values()) { 080 if (h.isValid()) { 081 ret.add(h); 082 } 083 } 084 } else { 085 for (Collection<Handle<ST,PK,S>> ch: getIndexMultiStore().values()) { 086 for (Handle<ST,PK,S> h: ch) { 087 if (h.isValid()) { 088 ret.add(h); 089 } 090 } 091 } 092 } 093 return ret; 094 } finally { 095 readLock().unlock(); 096 } 097 } 098 099 @SuppressWarnings("unchecked") 100 @Override 101 public Iterator<T> iterator() { 102 checkCorrupt(); 103 getStore().readLock().lock(); 104 try { 105 applyUpdates(); 106 try { 107 Collection<T> ret = new LinkedList<T>(); 108 if (isUnique()) { 109 for (Handle<ST,PK,S> h: getIndexUniqueStore().values()) { 110 if (h.isValid()) { 111 ret.add((T) h.get()); 112 } 113 } 114 } else { 115 for (Collection<Handle<ST,PK,S>> ch: getIndexMultiStore().values()) { 116 for (Handle<ST,PK,S> h: ch) { 117 if (h.isValid()) { 118 ret.add((T) h.get()); 119 } 120 } 121 } 122 } 123 return ret.iterator(); 124 } finally { 125 readLock().unlock(); 126 } 127 } finally { 128 getStore().readLock().unlock(); 129 } 130 } 131 132 @SuppressWarnings("unchecked") 133 @Override 134 public S getStore() { 135 return (S) store; 136 } 137 138 @Override 139 public Predicate<ST, S> getPredicate() { 140 return predicate; 141 } 142 143 @Override 144 public Extractor<T, V, S> getExtractor() { 145 return extractor; 146 } 147 148 @Override 149 public boolean isUnique() { 150 return Type.UNIQUE.equals(type); 151 } 152 153 @Override 154 public boolean isOrdered() { 155 return ordered; 156 } 157 158 @Override 159 public Comparator<V> getComparator() { 160 return comparator; 161 } 162 163 private Comparator<V> naturalComparator = new NaturalComparator<V>(); 164 165 protected abstract Collection<UpdateEntry> getPendingUpdates(); 166 167 /** 168 * Updates index 169 * @param changed Changed/added handle. 170 * @param isNew If true, then handle was added, updated otherwise. 171 */ 172 public void update(Handle<ST,PK,S> changed, boolean isNew) { 173 checkCorrupt(); 174 switch (type) { 175 case UNIQUE: 176 case SYNCHRONOUS: 177 writeLock().lock(); 178 try { 179 applyUpdate(changed, isNew); 180 } finally { 181 writeLock().unlock(); 182 } 183 break; 184 case ASYNCHRONOUS: 185 writeLock().lock(); 186 try { 187 getPendingUpdates().add(new UpdateEntry(changed, isNew)); 188 getExecutorService().submit(applyUpdatesTask); 189 } finally { 190 writeLock().unlock(); 191 } 192 break; 193 case LAZY: 194 writeLock().lock(); 195 try { 196 getPendingUpdates().add(new UpdateEntry(changed, isNew)); 197 } finally { 198 writeLock().unlock(); 199 } 200 break; 201 } 202 } 203 204 protected class UpdateEntry { 205 Handle<ST,PK,S> handle; 206 boolean isNew; 207 208 public UpdateEntry(Handle<ST, PK, S> handle, boolean isNew) { 209 super(); 210 this.handle = handle; 211 this.isNew = isNew; 212 } 213 214 @Override 215 public int hashCode() { 216 final int prime = 31; 217 int result = 1; 218 result = prime * result + ((handle == null) ? 0 : handle.hashCode()); 219 result = prime * result + (isNew ? 1231 : 1237); 220 return result; 221 } 222 223 @Override 224 public boolean equals(Object obj) { 225 if (this == obj) 226 return true; 227 if (obj == null) 228 return false; 229 if (getClass() != obj.getClass()) 230 return false; 231 UpdateEntry other = (UpdateEntry) obj; 232 if (handle == null) { 233 if (other.handle != null) 234 return false; 235 } else if (!handle.equals(other.handle)) 236 return false; 237 if (isNew != other.isNew) 238 return false; 239 return true; 240 } 241 242 } 243 244 protected abstract ExecutorService getExecutorService(); 245 246 protected void checkCorrupt() { 247 if (isCorrupt.get()) { 248 throw new StoreException("Index is corrupt"); 249 } 250 } 251 252 boolean isCorrupt() { 253 return isCorrupt.get(); 254 } 255 256 protected Runnable applyUpdatesTask = new Runnable() { 257 258 public void run() { 259 writeLock().lock(); 260 try { 261 Iterator<UpdateEntry> hit = getPendingUpdates().iterator(); 262 while (hit.hasNext()) { 263 UpdateEntry updateEntry = hit.next(); 264 applyUpdate(updateEntry.handle, updateEntry.isNew); 265 hit.remove(); 266 } 267 } catch (Exception e) { 268 isCorrupt.set(true); 269 handleAsynchException(e); 270 } finally { 271 writeLock().unlock(); 272 } 273 }; 274 }; 275 276 277 /** 278 * Applies pending updates, if any. This method acquires 279 * index write lock. If update operation is successful, 280 * then this method downgrades the lock to read lock. 281 * In the case of exception, write lock is released and 282 * read lock is not acquired. 283 */ 284 private void applyUpdates() { 285 writeLock().lock(); 286 try { 287 Iterator<UpdateEntry> hit = getPendingUpdates().iterator(); 288 while (hit.hasNext()) { 289 UpdateEntry updateEntry = hit.next(); 290 applyUpdate(updateEntry.handle, updateEntry.isNew); 291 hit.remove(); 292 } 293 readLock().lock(); 294 } finally { 295 writeLock().unlock(); 296 } 297 } 298 299 private AtomicInteger granularity = new AtomicInteger(); 300 301 /** 302 * This method is invoked within write lock, no need 303 * to acquire own lock. 304 * @param changed 305 */ 306 @SuppressWarnings("unchecked") 307 protected void applyUpdate(Handle<ST,PK,S> changed, boolean isNew) { 308 if (isUnique()) { 309 Map<V, Handle<ST, PK, S>> is = getIndexUniqueStore(); 310 if (!isNew) { 311 is.values().remove(changed); 312 } 313 if (predicate==null || changed.extract(predicate)) { 314 V key = extractor==null ? (V) changed.get() : changed.extract((Extractor<ST, V, S>) extractor); 315 if (is.containsKey(key)) { 316 throw new StoreException("Violation of unique index: "+this); 317 } 318 is.put(key, changed); 319 } 320 granularity.set(is.size()); 321 } else { 322 Map<V, Collection<Handle<ST, PK, S>>> is = getIndexMultiStore(); 323 if (!isNew) { 324 Iterator<Collection<Handle<ST, PK, S>>> vit = is.values().iterator(); 325 while (vit.hasNext()) { 326 Collection<Handle<ST, PK, S>> nextBucket = vit.next(); 327 if (nextBucket.remove(changed)) {; 328 if (nextBucket.isEmpty()) { 329 vit.remove(); 330 } 331 } 332 } 333 } 334 335 if (predicate==null || changed.extract(predicate)) { 336 V key = extractor==null ? (V) changed.get() : changed.extract((Extractor<ST, V, S>) extractor); 337 Collection<Handle<ST, PK, S>> bucket = is.get(key); 338 if (bucket == null) { 339 bucket = new LinkedList<Store.Handle<ST,PK,S>>(); 340 is.put(key, bucket); 341 } 342 bucket.add(changed); 343 } 344 345 granularity.set(getIndexMultiStore().size()); 346 } 347 348 } 349 350 public int getGranularity() { 351 return granularity.get(); 352 } 353 354 /** 355 * Handles exceptions thrown in asynchronous applyUpdates(). 356 * @param e 357 */ 358 protected abstract void handleAsynchException(Exception e); 359 360 /** 361 * Removes invalid handles from index store. This method 362 * operates under store's write lock, so it is not necessary 363 * to acquire own lock. 364 */ 365 // TODO - invoke from store cleanup asynchronously. 366 public void cleanup() { 367 if (isUnique()) { 368 Iterator<Handle<ST, PK, S>> hit = getIndexUniqueStore().values().iterator(); 369 while (hit.hasNext()) { 370 if (!hit.next().isValid()) { 371 hit.remove(); 372 } 373 } 374 } else { 375 Iterator<Collection<Handle<ST, PK, S>>> chit = getIndexMultiStore().values().iterator(); 376 while (chit.hasNext()) { 377 Collection<Handle<ST, PK, S>> ch = chit.next(); 378 Iterator<Handle<ST, PK, S>> hit = ch.iterator(); 379 while (hit.hasNext()) { 380 if (!hit.next().isValid()) { 381 hit.remove(); 382 } 383 } 384 if (ch.isEmpty()) { 385 chit.remove(); 386 } 387 } 388 } 389 } 390 391 private Iterable<ST> findMulti(V value) { 392 checkCorrupt(); 393 applyUpdates(); 394 try { 395 if (isUnique()) { // For upgraded 396 Handle<ST, PK, S> ret = getIndexUniqueStore().get(value); 397 if (ret==null || !ret.isValid()) { 398 return Collections.emptyList(); 399 } 400 return Collections.singleton(ret.get()); 401 } 402 403 Collection<Handle<ST, PK, S>> bucket = getIndexMultiStore().get(value); 404 if (bucket==null) { 405 return Collections.emptyList(); 406 } 407 Collection<ST> ret = new ArrayList<ST>(); 408 for (Handle<ST,PK,S> h: bucket) { 409 if (h.isValid()) { 410 ret.add(h.get()); 411 } 412 } 413 return ret; 414 } finally { 415 readLock().unlock(); 416 } 417 } 418 419 private ST findUnique(V value) { 420 checkCorrupt(); 421 applyUpdates(); 422 try { 423 Handle<ST, PK, S> ret = getIndexUniqueStore().get(value); 424 return ret!=null && ret.isValid() ? ret.get() : null; 425 } finally { 426 readLock().unlock(); 427 } 428 } 429 430 @SuppressWarnings("unchecked") 431 public Iterable<T> find(V from, V to, boolean fromInclusive, boolean toInclusive) { 432 checkCorrupt(); 433 applyUpdates(); 434 try { 435 Collection<T> ret = new LinkedList<T>(); 436 // Step 1 - key set to array. 437 Collection<V> keys = isUnique() ? getIndexUniqueStore().keySet() : getIndexMultiStore().keySet(); 438 List<V> keyList = keys instanceof List ? (List<V>) keys : new ArrayList<V>(keys); 439 Comparator<V> theComparator = getComparator()==null ? naturalComparator : getComparator(); 440 int fromIdx = Collections.binarySearch(keyList, from, theComparator); 441 if (fromIdx<0) { 442 // -Insertion point - 1 443 // Inclusiveness doesn't matter 444 fromIdx = -fromIdx - 1; 445 } else { 446 if (!fromInclusive) { 447 ++fromIdx; 448 } 449 } 450 451 int toIdx = Collections.binarySearch(keyList, to, theComparator); 452 if (toIdx<0) { 453 // -Insertion point - 1 454 // Inclusiveness doesn't matter 455 toIdx = -toIdx - 1; 456 } else { 457 if (!toInclusive) { 458 --toIdx; 459 } 460 } 461 462 for (int i=fromIdx; i<=toIdx; ++i) { 463 V currentKey = keyList.get(i); 464 465 if (isUnique()) { 466 Handle<ST, PK, S> handle = getIndexUniqueStore().get(currentKey); 467 if (handle.isValid()) { 468 ret.add((T) handle.get()); 469 } 470 } else { 471 for (Handle<ST,PK,S> h: getIndexMultiStore().get(currentKey)) { 472 if (h.isValid()) { 473 ret.add((T) h.get()); 474 } 475 } 476 } 477 } 478 return ret; 479 } finally { 480 readLock().unlock(); 481 } 482 } 483 484 private InvocationHandler invocationHandler = new InvocationHandler() { 485 486 @SuppressWarnings("unchecked") 487 @Override 488 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { 489 if (method.getName().equals("find") && method.getParameterTypes().length==1) { 490 if (UniqueIndex.class.equals(method.getDeclaringClass())) { 491 return findUnique((V) args[0]); 492 } 493 return findMulti((V) args[0]); 494 } 495 return method.invoke(AbstractIndex.this, args); 496 } 497 }; 498 499 /** 500 * @return proxy which implements interfaces appropriate for the index type. 501 */ 502 @SuppressWarnings("unchecked") 503 public Index<ST, T,PK,V,S> createProxy() { 504 if (isUnique()) { 505 if (isOrdered()) { 506 return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {OrderedIndex.class, UniqueIndex.class}, invocationHandler); 507 } else { 508 return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {UniqueIndex.class}, invocationHandler); 509 } 510 } else { 511 if (isOrdered()) { 512 return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {OrderedIndex.class, MultiIndex.class}, invocationHandler); 513 } else { 514 return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {MultiIndex.class}, invocationHandler); 515 } 516 } 517 } 518 519 @Override 520 public String toString() { 521 return "AbstractIndex [predicate=" + predicate + ", extractor=" 522 + extractor + ", type=" + type + ", ordered=" + ordered 523 + ", comparator=" + comparator + ", isCorrupt=" + isCorrupt 524 + "]"; 525 } 526 527 528 529}