001package com.hammurapi.store.local; 002 003import java.util.Collection; 004import java.util.Comparator; 005import java.util.Iterator; 006import java.util.LinkedList; 007import java.util.Map; 008import java.util.WeakHashMap; 009import java.util.concurrent.ConcurrentHashMap; 010import java.util.concurrent.ExecutorService; 011import java.util.concurrent.atomic.AtomicLong; 012import java.util.concurrent.locks.Lock; 013import java.util.concurrent.locks.ReadWriteLock; 014import java.util.concurrent.locks.ReentrantReadWriteLock; 015 016import com.hammurapi.common.ObservableConverter; 017import com.hammurapi.common.concurrent.CallerThreadExecutorService; 018import com.hammurapi.extract.Extractor; 019import com.hammurapi.extract.Predicate; 020import com.hammurapi.store.AbstractIndex; 021import com.hammurapi.store.AbstractStore; 022import com.hammurapi.store.DeputyReadWriteLock; 023import com.hammurapi.store.Index.Type; 024import com.hammurapi.store.Store; 025import com.hammurapi.store.StoreException; 026import com.hammurapi.store.TrackingLock; 027import com.hammurapi.store.local.LocalHandle.HandleStrength; 028 029/** 030 * In-memory object store. 031 * @author Pavel Vlasov 032 * 033 * @param <T> 034 */ 035public abstract class LocalStoreBase<T, PK, S extends Store<T,PK,S>> extends AbstractStore<T, PK, S> { 036 037 /** 038 * Store configuration. 039 * @author Pavel Vlasov 040 * 041 */ 042 public static class Config<T,PK,S extends Store<T,PK,S>> implements Cloneable { 043 044 private final HandleFactory<T,PK,S,LocalHandle<T,PK,S>> DEFAULT_HANDLE_FACTORY = new HandleFactory<T, PK, S, LocalHandle<T,PK,S>>() { 045 046 @Override 047 public LocalHandle<T, PK, S> createHandle( 048 S store, 049 T obj, 050 PK primaryKey, 051 Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache, 052 Predicate<T, S>[] validators, 053 HandleStrength handleStrength, 054 boolean cacheExtracted) { 055 return new LocalHandle<T,PK,S>((LocalStoreBase<T, PK, S>) store, obj, primaryKey, cache, validators, handleStrength, cacheExtracted); 056 } 057 058 }; 059 060 private long cleanupTaskThreshold = CLEANUP_TASK_THRESHOLD; 061 private ExecutorService executorService; 062 private Extractor<T, PK, S> primaryKeyExtractor; 063 private boolean cacheExtracted = true; 064 private HandleStrength handleStrength = HandleStrength.STRONG; 065 private boolean weakPrimaryKey; 066 private Map<PK, Handle<T,PK,S>> pkStore; 067 private Collection<Handle<T,PK,S>> noPkStore; 068 private HandleFactory<T, PK, S, LocalHandle<T,PK,S>> handleFactory = DEFAULT_HANDLE_FACTORY; 069 private ObservableConverter<T> observableConverter; 070// private TimeUnit statsTimeUnit; 071// private boolean collectStats; 072 073 // TODO Asynch index exception listener. 074// BackingStore<T, PK> backingStore; 075// int writeBehindBuffer; 076// long writeBehindDelay; 077// TimeUnit writeBehindTimeUnit; 078 079// /** 080// * If time unit is set, store collects execution time statistics. 081// */ 082// public void setStatsTimeUnit(TimeUnit statsTimeUnit) { 083// this.statsTimeUnit = statsTimeUnit; 084// } 085// 086// public TimeUnit getStatsTimeUnit() { 087// return statsTimeUnit; 088// } 089// 090// /** 091// * If set to true, store collects execution statistics. 092// * @param collectStats 093// */ 094// public void setCollectStats(boolean collectStats) { 095// this.collectStats = collectStats; 096// } 097// 098// public boolean isCollectStats() { 099// return collectStats; 100// } 101 102 /** 103 * @param cleanupTaskThreshold When number of invalidated (removed) handles exceeds this 104 * threshold, the store performs cleanup of internal collections. 105 */ 106 public void setCleanupTaskThreshold(long cleanupTaskThreshold) { 107 this.cleanupTaskThreshold = cleanupTaskThreshold; 108 } 109 110 /** 111 * @return Cleanup task threshold. 112 */ 113 public long getCleanupTaskThreshold() { 114 return cleanupTaskThreshold; 115 } 116 117 @Override 118 public Object clone() throws CloneNotSupportedException { 119 return super.clone(); 120 } 121 122 /** 123 * @return Executor service. If null, all operations are performed in the caller thread. 124 */ 125 public ExecutorService getExecutorService() { 126 return executorService; 127 } 128 129 /** 130 * 131 * @param executorService Executor service. If null, all operations are performed in the caller thread. 132 */ 133 public void setExecutorService(ExecutorService executorService) { 134 this.executorService = executorService; 135 } 136 137 138 public Extractor<T, PK, S> getPrimaryKeyExtractor() { 139 return primaryKeyExtractor; 140 } 141 142 /** 143 * @param primaryKeyExtractor Primary key extractor. Can be null. 144 */ 145 public void setPrimaryKeyExtractor( 146 Extractor<T, PK, S> primaryKeyExtractor) { 147 this.primaryKeyExtractor = primaryKeyExtractor; 148 } 149 150 public boolean isCacheExtracted() { 151 return cacheExtracted; 152 } 153 154 /** 155 * @param cacheExtracted If true, values extracted from stored objects are cached until object is updated 156 * through update() or put() methods. Default value is true. 157 */ 158 public void setCacheExtracted(boolean cacheExtracted) { 159 this.cacheExtracted = cacheExtracted; 160 } 161 162 public HandleStrength getHandleStrength() { 163 return handleStrength; 164 } 165 166 /** 167 * @param handleStrength Handle strength. Default handle strength is HARD. 168 */ 169 public void setHandleStrength(HandleStrength handleStrength) { 170 this.handleStrength = handleStrength; 171 } 172 173 public boolean isWeakPrimaryKey() { 174 return weakPrimaryKey; 175 } 176 177 /** 178 * @param weakStore If true, the store uses WeakHashMap if primary key is set and 179 * pkStore is not set explicitly. 180 */ 181 public void setWeakPrimaryKey(boolean weakPrimaryKey) { 182 this.weakPrimaryKey = weakPrimaryKey; 183 } 184 185 public Map<PK, Handle<T, PK, S>> getPkStore() { 186 return pkStore; 187 } 188 189 /** 190 * @param pkStore Explicitly provided pkStore instance. 191 */ 192 public void setPkStore(Map<PK, Handle<T, PK, S>> pkStore) { 193 this.pkStore = pkStore; 194 } 195 196 public Collection<Handle<T, PK, S>> getNoPkStore() { 197 return noPkStore; 198 } 199 200 /** 201 * @param noPkStore Explicitly provided noPkStore instance. 202 */ 203 public void setNoPkStore(Collection<Handle<T, PK, S>> noPkStore) { 204 this.noPkStore = noPkStore; 205 } 206 207 public void setHandleFactory(HandleFactory<T, PK, S, LocalHandle<T, PK, S>> handleFactory) { 208 this.handleFactory = handleFactory; 209 } 210 211 public HandleFactory<T, PK, S, LocalHandle<T, PK, S>> getHandleFactory() { 212 return handleFactory; 213 } 214 215 public void setObservableConverter(ObservableConverter<T> observableConverter) { 216 this.observableConverter = observableConverter; 217 } 218 219 public ObservableConverter<T> getObservableConverter() { 220 return observableConverter; 221 } 222 } 223 224 private static final long CLEANUP_TASK_THRESHOLD = 1000; 225 226 protected Config<T,PK,S> config; 227 228 @SuppressWarnings("unchecked") 229 protected LocalStoreBase(Config<T,PK,S> config) { 230 try { 231 this.config = (Config<T, PK, S>) config.clone(); 232 } catch (CloneNotSupportedException e) { 233 throw new StoreException(e); 234 } 235 236 if (this.config.getExecutorService()==null) { 237 this.config.setExecutorService(CallerThreadExecutorService.INSTANCE); 238 } 239 240 if (this.config.getPrimaryKeyExtractor()==null) { 241 if (this.config.getNoPkStore()==null) { 242 this.config.setNoPkStore(new LinkedList<Handle<T,PK,S>>() { // TODO - Address uniqueness 243 244 // Synchronized for situations when updaters perform inserts. 245 public synchronized boolean add(Handle<T,PK,S> e) { 246 return super.add(e); 247 } 248 }); 249 } 250 } else { 251 if (this.config.getPkStore()==null) { 252 if (this.config.isWeakPrimaryKey()) { 253 this.config.setPkStore(new WeakHashMap<PK, Handle<T,PK,S>>() { 254 255 // Synchronized for situations when updaters perform inserts. 256 public synchronized Handle<T,PK,S> put(PK key, Handle<T,PK,S> value) { 257 return super.put(key, value); 258 }; 259 260 }); 261 } else { 262 this.config.setPkStore(new ConcurrentHashMap<PK, Handle<T,PK,S>>()); 263 } 264 } 265 } 266 267 } 268 269 private ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock(); 270 private TrackingLock readLock = new LocalTrackingLock(storeLock.readLock()); 271 private TrackingLock writeLock = new LocalTrackingLock(storeLock.writeLock()); 272 273 @Override 274 public Lock readLock() { 275 return readLock; 276 } 277 278 @Override 279 public Lock writeLock() { 280 return writeLock; 281 } 282 283 @Override 284 protected Map<PK, Handle<T, PK, S>> getPkStore() { 285 return config.getPkStore(); 286 } 287 288 @Override 289 protected Collection<Handle<T, PK, S>> getNoPkStore() { 290 return getPrimaryKeyExtractor()==null ? config.getNoPkStore() : getPkStore().values(); 291 } 292 293 @Override 294 protected ExecutorService getExecutorService() { 295 return config.getExecutorService(); 296 } 297 298 299 @SuppressWarnings("unchecked") 300 @Override 301 protected Handle<T, PK, S> createHandle( 302 T obj, 303 PK primaryKey, 304 Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache, 305 Predicate<T, S>[] validators) { 306 return config.getHandleFactory().createHandle((S) this, obj, primaryKey, cache, validators, config.getHandleStrength(), config.isCacheExtracted()); 307 } 308 309 @Override 310 protected Collection<Handle<T, PK, S>> createIndexCollection( 311 boolean unique, 312 boolean ordered, 313 Comparator<T> comparator) { 314 315 throw new UnsupportedOperationException(); 316 } 317 318 @Override 319 protected Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> createCache() { 320 if (config.isCacheExtracted()) { 321 return new ConcurrentHashMap<S, Map<Extractor<T, ? super PK, S>, ? super PK>>(); 322 } 323 return null; 324 } 325 326 @Override 327 protected ReadWriteLock createMasterLock() { 328 ReadWriteLock master = new ReadWriteLock() { 329 330 @Override 331 public Lock writeLock() { 332 return writeLock; 333 } 334 335 @Override 336 public Lock readLock() { 337 return readLock; 338 } 339 }; 340 return new DeputyReadWriteLock(master, readLock.isLocked() || writeLock.isLocked(), writeLock.isLocked()); 341 } 342 343 private AtomicLong removedCounter = new AtomicLong(0); 344 345 /** 346 * If there is a lot of cleared handles this method posts a task to compact store collections.. 347 */ 348 @Override 349 protected void onRemoved(int removed) { 350 if (removedCounter.addAndGet(removed)>config.getCleanupTaskThreshold()) { 351 getExecutorService().submit(new Runnable() { 352 353 @Override 354 public void run() { 355 writeLock().lock(); 356 int cleanedUp = 0; 357 try { 358 Iterator<Handle<T, PK, S>> it = getNoPkStore().iterator(); // Takes care of both map and collection. 359 while (it.hasNext()) { 360 Handle<T, PK, S> h = it.next(); 361 if (!h.isValid()) { 362 it.remove(); 363 ++cleanedUp; 364 } 365 } 366 367 for (AbstractIndex<T,? extends T,PK,?,S> index: getIndices()) { 368 index.cleanup(); 369 } 370 } finally { 371 writeLock().unlock(); 372 if (cleanedUp>0) { 373 removedCounter.addAndGet(-cleanedUp); 374 } 375 } 376 } 377 378 }); 379 } 380 } 381 382 @Override 383 public Extractor<T, PK, S> getPrimaryKeyExtractor() { 384 return config.getPrimaryKeyExtractor(); 385 } 386 387 private Collection<AbstractIndex<T, ? extends T, PK, ?, S>> indices = new LinkedList<AbstractIndex<T,? extends T,PK,?,S>>(); 388 389 @Override 390 protected Collection<AbstractIndex<T, ? extends T, PK, ?, S>> getIndices() { 391 return indices; 392 } 393 394 @Override 395 protected <V, ST extends T> AbstractIndex<T, ST, PK, V, S> createIndex( 396 Predicate<T, S> predicate, 397 Extractor<ST, V, S> extractor, 398 Type type, 399 boolean ordered, 400 Comparator<V> comparator) { 401 402 return new LocalIndex<T, ST, PK, V, S>(predicate, extractor, type, ordered, comparator, this); 403 } 404 405 protected void handleIndexAsynchException(Exception e) { 406 e.printStackTrace(); 407 } 408 409 /** 410 * To make it visible to LocalHandle. 411 */ 412 @Override 413 protected void updateIndices(Handle<T, PK, S> handle, boolean isNew) { 414 super.updateIndices(handle, isNew); 415 } 416 417 @Override 418 protected abstract S createDeputy(); 419}