001package com.hammurapi.store.local;
002
003import java.util.Collection;
004import java.util.Comparator;
005import java.util.Iterator;
006import java.util.LinkedList;
007import java.util.Map;
008import java.util.WeakHashMap;
009import java.util.concurrent.ConcurrentHashMap;
010import java.util.concurrent.ExecutorService;
011import java.util.concurrent.atomic.AtomicLong;
012import java.util.concurrent.locks.Lock;
013import java.util.concurrent.locks.ReadWriteLock;
014import java.util.concurrent.locks.ReentrantReadWriteLock;
015
016import com.hammurapi.common.ObservableConverter;
017import com.hammurapi.common.concurrent.CallerThreadExecutorService;
018import com.hammurapi.extract.Extractor;
019import com.hammurapi.extract.Predicate;
020import com.hammurapi.store.AbstractIndex;
021import com.hammurapi.store.AbstractStore;
022import com.hammurapi.store.DeputyReadWriteLock;
023import com.hammurapi.store.Index.Type;
024import com.hammurapi.store.Store;
025import com.hammurapi.store.StoreException;
026import com.hammurapi.store.TrackingLock;
027import com.hammurapi.store.local.LocalHandle.HandleStrength;
028
029/**
030 * In-memory object store.
031 * @author Pavel Vlasov
032 *
033 * @param <T>
034 */
035public abstract class LocalStoreBase<T, PK, S extends Store<T,PK,S>> extends AbstractStore<T, PK, S> {
036        
037        /**
038         * Store configuration.
039         * @author Pavel Vlasov
040         *
041         */
042        public static class Config<T,PK,S extends Store<T,PK,S>> implements Cloneable {
043                
044                private final HandleFactory<T,PK,S,LocalHandle<T,PK,S>> DEFAULT_HANDLE_FACTORY = new HandleFactory<T, PK, S, LocalHandle<T,PK,S>>() {
045
046                        @Override
047                        public LocalHandle<T, PK, S> createHandle(
048                                        S store,
049                                        T obj,
050                                        PK primaryKey,
051                                        Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache,
052                                        Predicate<T, S>[] validators,
053                                        HandleStrength handleStrength,
054                                        boolean cacheExtracted) {
055                                return new LocalHandle<T,PK,S>((LocalStoreBase<T, PK, S>) store, obj, primaryKey, cache, validators, handleStrength, cacheExtracted);
056                        }
057                        
058                };
059                
060                private long cleanupTaskThreshold = CLEANUP_TASK_THRESHOLD;             
061                private ExecutorService executorService;
062                private Extractor<T, PK, S> primaryKeyExtractor;
063                private boolean cacheExtracted = true;
064                private HandleStrength handleStrength = HandleStrength.STRONG;
065                private boolean weakPrimaryKey;
066                private Map<PK, Handle<T,PK,S>> pkStore;
067                private Collection<Handle<T,PK,S>> noPkStore;
068                private HandleFactory<T, PK, S, LocalHandle<T,PK,S>> handleFactory = DEFAULT_HANDLE_FACTORY;
069                private ObservableConverter<T> observableConverter;
070//              private TimeUnit statsTimeUnit;
071//              private boolean collectStats;
072                
073                // TODO Asynch index exception listener.
074//              BackingStore<T, PK> backingStore; 
075//              int writeBehindBuffer;
076//              long writeBehindDelay;
077//              TimeUnit writeBehindTimeUnit;
078                
079//              /**
080//               * If time unit is set, store collects execution time statistics.
081//               */
082//              public void setStatsTimeUnit(TimeUnit statsTimeUnit) {
083//                      this.statsTimeUnit = statsTimeUnit;
084//              }
085//              
086//              public TimeUnit getStatsTimeUnit() {
087//                      return statsTimeUnit;
088//              }
089//              
090//              /**
091//               * If set to true, store collects execution statistics.
092//               * @param collectStats
093//               */
094//              public void setCollectStats(boolean collectStats) {
095//                      this.collectStats = collectStats;
096//              }
097//              
098//              public boolean isCollectStats() {
099//                      return collectStats;
100//              }
101                
102                /**
103                 * @param cleanupTaskThreshold When number of invalidated (removed) handles exceeds this
104                 * threshold, the store performs cleanup of internal collections.
105                 */
106                public void setCleanupTaskThreshold(long cleanupTaskThreshold) {
107                        this.cleanupTaskThreshold = cleanupTaskThreshold;
108                }
109                
110                /**
111                 * @return Cleanup task threshold.
112                 */
113                public long getCleanupTaskThreshold() {
114                        return cleanupTaskThreshold;
115                }
116                
117                @Override
118                public Object clone() throws CloneNotSupportedException {
119                        return super.clone();
120                }
121
122                /**
123                 * @return Executor service. If null, all operations are performed in the caller thread.
124                 */
125                public ExecutorService getExecutorService() {
126                        return executorService;
127                }
128
129                /**
130                 * 
131                 * @param executorService Executor service. If null, all operations are performed in the caller thread.
132                 */
133                public void setExecutorService(ExecutorService executorService) {
134                        this.executorService = executorService;
135                }
136
137                
138                public Extractor<T, PK, S> getPrimaryKeyExtractor() {
139                        return primaryKeyExtractor;
140                }
141
142                /**
143                 * @param primaryKeyExtractor Primary key extractor. Can be null.
144                 */
145                public void setPrimaryKeyExtractor(
146                                Extractor<T, PK, S> primaryKeyExtractor) {
147                        this.primaryKeyExtractor = primaryKeyExtractor;
148                }
149
150                public boolean isCacheExtracted() {
151                        return cacheExtracted;
152                }
153
154                /**
155                 * @param cacheExtracted If true, values extracted from stored objects are cached until object is updated 
156                 * through update() or put() methods. Default value is true.
157                 */
158                public void setCacheExtracted(boolean cacheExtracted) {
159                        this.cacheExtracted = cacheExtracted;
160                }
161                
162                public HandleStrength getHandleStrength() {
163                        return handleStrength;
164                }
165
166                /**
167                 * @param handleStrength Handle strength. Default handle strength is HARD.
168                 */
169                public void setHandleStrength(HandleStrength handleStrength) {
170                        this.handleStrength = handleStrength;
171                }
172
173                public boolean isWeakPrimaryKey() {
174                        return weakPrimaryKey;
175                }
176
177                /**
178                 * @param weakStore If true, the store uses WeakHashMap if primary key is set and
179                 * pkStore is not set explicitly.
180                 */
181                public void setWeakPrimaryKey(boolean weakPrimaryKey) {
182                        this.weakPrimaryKey = weakPrimaryKey;
183                }
184
185                public Map<PK, Handle<T, PK, S>> getPkStore() {
186                        return pkStore;
187                }
188
189                /**
190                 * @param pkStore Explicitly provided pkStore instance.
191                 */
192                public void setPkStore(Map<PK, Handle<T, PK, S>> pkStore) {
193                        this.pkStore = pkStore;
194                }
195
196                public Collection<Handle<T, PK, S>> getNoPkStore() {
197                        return noPkStore;
198                }
199
200                /**
201                 * @param noPkStore Explicitly provided noPkStore instance.
202                 */
203                public void setNoPkStore(Collection<Handle<T, PK, S>> noPkStore) {
204                        this.noPkStore = noPkStore;
205                }
206                
207                public void setHandleFactory(HandleFactory<T, PK, S, LocalHandle<T, PK, S>> handleFactory) {
208                        this.handleFactory = handleFactory;
209                }
210                
211                public HandleFactory<T, PK, S, LocalHandle<T, PK, S>> getHandleFactory() {
212                        return handleFactory;
213                }
214                
215                public void setObservableConverter(ObservableConverter<T> observableConverter) {
216                        this.observableConverter = observableConverter;
217                }
218                
219                public ObservableConverter<T> getObservableConverter() {
220                        return observableConverter;
221                }
222        }
223
224        private static final long CLEANUP_TASK_THRESHOLD = 1000;
225        
226        protected Config<T,PK,S> config;
227        
228        @SuppressWarnings("unchecked")
229        protected LocalStoreBase(Config<T,PK,S> config) {
230                try {
231                        this.config = (Config<T, PK, S>) config.clone();
232                } catch (CloneNotSupportedException e) {
233                        throw new StoreException(e);
234                }
235                
236                if (this.config.getExecutorService()==null) {
237                        this.config.setExecutorService(CallerThreadExecutorService.INSTANCE);
238                }
239
240                if (this.config.getPrimaryKeyExtractor()==null) {
241                        if (this.config.getNoPkStore()==null) {
242                                this.config.setNoPkStore(new LinkedList<Handle<T,PK,S>>() { // TODO - Address uniqueness
243                                        
244                                        // Synchronized for situations when updaters perform inserts.
245                                        public synchronized boolean add(Handle<T,PK,S> e) {
246                                                return super.add(e);
247                                        }
248                                });                             
249                        }
250                } else {
251                        if (this.config.getPkStore()==null) {
252                                if (this.config.isWeakPrimaryKey()) {
253                                        this.config.setPkStore(new WeakHashMap<PK, Handle<T,PK,S>>() {
254                                                
255                                                // Synchronized for situations when updaters perform inserts.
256                                                public synchronized Handle<T,PK,S> put(PK key, Handle<T,PK,S> value) {
257                                                        return super.put(key, value);
258                                                };
259                                        
260                                        });
261                                } else {
262                                        this.config.setPkStore(new ConcurrentHashMap<PK, Handle<T,PK,S>>());
263                                }
264                        }
265                }
266                
267        }
268        
269        private ReentrantReadWriteLock storeLock = new ReentrantReadWriteLock();
270        private TrackingLock readLock = new LocalTrackingLock(storeLock.readLock());
271        private TrackingLock writeLock = new LocalTrackingLock(storeLock.writeLock());
272
273        @Override
274        public Lock readLock() {
275                return readLock;
276        }
277
278        @Override
279        public Lock writeLock() {
280                return writeLock;
281        }
282
283        @Override
284        protected Map<PK, Handle<T, PK, S>> getPkStore() {
285                return config.getPkStore();
286        }
287        
288        @Override
289        protected Collection<Handle<T, PK, S>> getNoPkStore() {
290                return getPrimaryKeyExtractor()==null ? config.getNoPkStore() : getPkStore().values();
291        }
292
293        @Override
294        protected ExecutorService getExecutorService() {
295                return config.getExecutorService();
296        }
297        
298
299        @SuppressWarnings("unchecked")
300        @Override
301        protected Handle<T, PK, S> createHandle(
302                        T obj,
303                        PK primaryKey,
304                        Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> cache, 
305                        Predicate<T, S>[] validators) {
306                return config.getHandleFactory().createHandle((S) this, obj, primaryKey, cache, validators, config.getHandleStrength(), config.isCacheExtracted());
307        }
308
309        @Override
310        protected Collection<Handle<T, PK, S>> createIndexCollection(
311                        boolean unique, 
312                        boolean ordered, 
313                        Comparator<T> comparator) {
314                
315                throw new UnsupportedOperationException();
316        }
317
318        @Override
319        protected Map<S, Map<Extractor<T, ? super PK, S>, ? super PK>> createCache() {
320                if (config.isCacheExtracted()) {
321                        return new ConcurrentHashMap<S, Map<Extractor<T, ? super PK, S>, ? super PK>>();
322                }
323                return null;
324        }
325
326        @Override
327        protected ReadWriteLock createMasterLock() {            
328                ReadWriteLock master = new ReadWriteLock() {
329                        
330                        @Override
331                        public Lock writeLock() {
332                                return writeLock;
333                        }
334                        
335                        @Override
336                        public Lock readLock() {
337                                return readLock;
338                        }
339                };
340                return new DeputyReadWriteLock(master, readLock.isLocked() || writeLock.isLocked(), writeLock.isLocked());
341        }
342        
343        private AtomicLong removedCounter = new AtomicLong(0);
344
345        /**
346         * If there is a lot of cleared handles this method posts a task to compact store collections..
347         */
348        @Override
349        protected void onRemoved(int removed) {
350                if (removedCounter.addAndGet(removed)>config.getCleanupTaskThreshold()) {
351                        getExecutorService().submit(new Runnable() {
352
353                                @Override
354                                public void run() {
355                                        writeLock().lock();
356                                        int cleanedUp = 0;
357                                        try {
358                                                Iterator<Handle<T, PK, S>> it = getNoPkStore().iterator(); // Takes care of both map and collection.
359                                                while (it.hasNext()) {
360                                                        Handle<T, PK, S> h = it.next();
361                                                        if (!h.isValid()) {
362                                                                it.remove();
363                                                                ++cleanedUp;
364                                                        }
365                                                }
366                                                
367                                                for (AbstractIndex<T,? extends T,PK,?,S> index: getIndices()) {
368                                                        index.cleanup();
369                                                }
370                                        } finally {
371                                                writeLock().unlock();
372                                                if (cleanedUp>0) {
373                                                        removedCounter.addAndGet(-cleanedUp);
374                                                }
375                                        }
376                                }
377                                
378                        });
379                }
380        }
381
382        @Override
383        public Extractor<T, PK, S> getPrimaryKeyExtractor() {
384                return config.getPrimaryKeyExtractor();
385        }
386        
387        private Collection<AbstractIndex<T, ? extends T, PK, ?, S>> indices = new LinkedList<AbstractIndex<T,? extends T,PK,?,S>>();
388
389        @Override
390        protected Collection<AbstractIndex<T, ? extends T, PK, ?, S>> getIndices() {
391                return indices;
392        }
393        
394        @Override
395        protected <V, ST extends T> AbstractIndex<T, ST, PK, V, S> createIndex(
396                        Predicate<T, S> predicate,
397                        Extractor<ST, V, S> extractor, 
398                        Type type,
399                        boolean ordered, 
400                        Comparator<V> comparator) {
401                
402                return new LocalIndex<T, ST, PK, V, S>(predicate, extractor, type, ordered, comparator, this);
403        }
404 
405        protected void handleIndexAsynchException(Exception e) {
406                e.printStackTrace();            
407        }
408        
409        /**
410         * To make it visible to LocalHandle.
411         */
412        @Override
413        protected void updateIndices(Handle<T, PK, S> handle, boolean isNew) {
414                super.updateIndices(handle, isNew);
415        }
416        
417        @Override
418        protected abstract S createDeputy();
419}