001package com.hammurapi.store;
002
003import java.lang.reflect.InvocationHandler;
004import java.lang.reflect.Method;
005import java.lang.reflect.Proxy;
006import java.util.ArrayList;
007import java.util.Collection;
008import java.util.Collections;
009import java.util.Comparator;
010import java.util.Iterator;
011import java.util.LinkedList;
012import java.util.List;
013import java.util.Map;
014import java.util.concurrent.ExecutorService;
015import java.util.concurrent.atomic.AtomicBoolean;
016import java.util.concurrent.atomic.AtomicInteger;
017import java.util.concurrent.locks.Lock;
018
019import com.hammurapi.extract.Extractor;
020import com.hammurapi.extract.Predicate;
021import com.hammurapi.store.Store.Handle;
022
023public abstract class AbstractIndex<ST, T extends ST, PK, V, S extends Store<ST,PK,S>> implements OrderedIndex<ST, T, PK, V, S> {
024
025        private Predicate<ST, S> predicate;
026        private Extractor<T, V, S> extractor; 
027        protected Index.Type type; 
028        protected boolean ordered;
029        protected Comparator<V> comparator;
030        protected AbstractStore<ST, PK, S> store;
031        protected AtomicBoolean isCorrupt = new AtomicBoolean(false);
032        
033        protected AbstractIndex(
034                        Predicate<ST, S> predicate,
035                        Extractor<T, V, S> extractor, 
036                        Index.Type type, 
037                        boolean ordered, 
038                        Comparator<V> comparator,
039                        AbstractStore<ST,PK,S> store) {
040                this.predicate = predicate;
041                this.extractor = extractor;
042                this.type = type;
043                this.ordered = ordered;
044                this.comparator = comparator;
045                this.store = store;
046        }
047        
048        /**
049         * @return Index unique store. If index is ordered, the index store
050         * shall be of type SortedMap.
051         */
052        protected abstract Map<V, Handle<ST,PK,S>> getIndexUniqueStore();
053        
054        /**
055         * @return Index multi-store. If index is ordered, the index store
056         * shall be of type SortedMap.
057         */
058        protected abstract Map<V, Collection<Handle<ST,PK,S>>> getIndexMultiStore();
059        
060        protected abstract Lock readLock();
061        
062        protected abstract Lock writeLock();
063        
064        /**
065         * Upgrades index type.
066         * @param type Asynchronous and lazy indices can be 
067         * upgraded to synchronous. All index types can be upgraded
068         * to unique if current index data does not contain duplicates.
069         * @param ordered unordered index can be upgraded to ordered.
070         */
071        public abstract void upgrade(Index.Type type, boolean ordered, Comparator<V> comparator);
072        
073        Collection<Handle<ST,PK,S>> getHandles() {
074                checkCorrupt();
075                applyUpdates();
076                try { 
077                        Collection<Handle<ST,PK,S>> ret = new LinkedList<Handle<ST,PK,S>>();
078                        if (isUnique()) {
079                                for (Handle<ST,PK,S> h: getIndexUniqueStore().values()) {
080                                        if (h.isValid()) {
081                                                ret.add(h);
082                                        }
083                                }
084                        } else {
085                                for (Collection<Handle<ST,PK,S>> ch: getIndexMultiStore().values()) {
086                                        for (Handle<ST,PK,S> h: ch) {
087                                                if (h.isValid()) {
088                                                        ret.add(h);
089                                                }
090                                        }
091                                }
092                        }
093                        return ret;
094                } finally {
095                        readLock().unlock();
096                }
097        }
098
099        @SuppressWarnings("unchecked")
100        @Override
101        public Iterator<T> iterator() {
102                checkCorrupt();
103                getStore().readLock().lock();
104                try {
105                        applyUpdates();
106                        try { 
107                                Collection<T> ret = new LinkedList<T>();
108                                if (isUnique()) {
109                                        for (Handle<ST,PK,S> h: getIndexUniqueStore().values()) {
110                                                if (h.isValid()) {
111                                                        ret.add((T) h.get());
112                                                }
113                                        }
114                                } else {
115                                        for (Collection<Handle<ST,PK,S>> ch: getIndexMultiStore().values()) {
116                                                for (Handle<ST,PK,S> h: ch) {
117                                                        if (h.isValid()) {
118                                                                ret.add((T) h.get());
119                                                        }
120                                                }
121                                        }
122                                }
123                                return ret.iterator();
124                        } finally {
125                                readLock().unlock();
126                        }
127                } finally {
128                        getStore().readLock().unlock();
129                }
130        }
131
132        @SuppressWarnings("unchecked")
133        @Override
134        public S getStore() {
135                return (S) store;
136        }
137
138        @Override
139        public Predicate<ST, S> getPredicate() {
140                return predicate;
141        }
142
143        @Override
144        public Extractor<T, V, S> getExtractor() {
145                return extractor;
146        }
147
148        @Override
149        public boolean isUnique() {
150                return Type.UNIQUE.equals(type);
151        }
152
153        @Override
154        public boolean isOrdered() {
155                return ordered;
156        }               
157
158        @Override
159        public Comparator<V> getComparator() {
160                return comparator;
161        }
162        
163        private Comparator<V> naturalComparator = new NaturalComparator<V>();
164        
165        protected abstract Collection<UpdateEntry> getPendingUpdates();
166
167        /**
168         * Updates index
169         * @param changed Changed/added handle.
170         * @param isNew If true, then handle was added, updated otherwise.
171         */
172        public void update(Handle<ST,PK,S> changed, boolean isNew) {
173                checkCorrupt();
174                switch (type) {
175                case UNIQUE:
176                case SYNCHRONOUS:
177                        writeLock().lock();
178                        try {
179                                applyUpdate(changed, isNew);
180                        } finally {
181                                writeLock().unlock();
182                        }
183                        break;
184                case ASYNCHRONOUS:
185                        writeLock().lock();
186                        try {
187                                getPendingUpdates().add(new UpdateEntry(changed, isNew));
188                                getExecutorService().submit(applyUpdatesTask);
189                        } finally {
190                                writeLock().unlock();
191                        }
192                        break;
193                case LAZY:
194                        writeLock().lock();
195                        try {
196                                getPendingUpdates().add(new UpdateEntry(changed, isNew));
197                        } finally {
198                                writeLock().unlock();
199                        }
200                        break;
201                }
202        }
203        
204        protected class UpdateEntry {
205                Handle<ST,PK,S> handle;
206                boolean isNew;
207                
208                public UpdateEntry(Handle<ST, PK, S> handle, boolean isNew) {
209                        super();
210                        this.handle = handle;
211                        this.isNew = isNew;
212                }
213
214                @Override
215                public int hashCode() {
216                        final int prime = 31;
217                        int result = 1;
218                        result = prime * result + ((handle == null) ? 0 : handle.hashCode());
219                        result = prime * result + (isNew ? 1231 : 1237);
220                        return result;
221                }
222
223                @Override
224                public boolean equals(Object obj) {
225                        if (this == obj)
226                                return true;
227                        if (obj == null)
228                                return false;
229                        if (getClass() != obj.getClass())
230                                return false;
231                        UpdateEntry other = (UpdateEntry) obj;
232                        if (handle == null) {
233                                if (other.handle != null)
234                                        return false;
235                        } else if (!handle.equals(other.handle))
236                                return false;
237                        if (isNew != other.isNew)
238                                return false;
239                        return true;
240                }
241
242        }
243        
244        protected abstract ExecutorService getExecutorService();
245        
246        protected void checkCorrupt() {
247                if (isCorrupt.get()) {
248                        throw new StoreException("Index is corrupt");
249                }
250        }
251        
252        boolean isCorrupt() {
253                return isCorrupt.get();
254        }
255        
256        protected Runnable applyUpdatesTask = new Runnable() {
257                
258                public void run() {
259                        writeLock().lock();
260                        try {
261                                Iterator<UpdateEntry> hit = getPendingUpdates().iterator();
262                                while (hit.hasNext()) {
263                                        UpdateEntry updateEntry = hit.next();
264                                        applyUpdate(updateEntry.handle, updateEntry.isNew);
265                                        hit.remove();
266                                }
267                        } catch (Exception e) {
268                                isCorrupt.set(true);
269                                handleAsynchException(e);
270                        } finally {
271                                writeLock().unlock();
272                        }
273                };
274        };
275        
276        
277        /**
278         * Applies pending updates, if any. This method acquires
279         * index write lock. If update operation is successful,
280         * then this method downgrades the lock to read lock.
281         * In the case of exception, write lock is released and
282         * read lock is not acquired.   
283         */
284        private void applyUpdates() {
285                writeLock().lock();
286                try {
287                        Iterator<UpdateEntry> hit = getPendingUpdates().iterator();
288                        while (hit.hasNext()) {
289                                UpdateEntry updateEntry = hit.next();
290                                applyUpdate(updateEntry.handle, updateEntry.isNew);
291                                hit.remove();
292                        }
293                        readLock().lock();
294                } finally {
295                        writeLock().unlock();
296                }
297        }
298        
299        private AtomicInteger granularity = new AtomicInteger();
300        
301        /**
302         * This method is invoked within write lock, no need
303         * to acquire own lock.
304         * @param changed
305         */
306        @SuppressWarnings("unchecked")
307        protected void applyUpdate(Handle<ST,PK,S> changed, boolean isNew) {              
308                if (isUnique()) {
309                        Map<V, Handle<ST, PK, S>> is = getIndexUniqueStore();
310                        if (!isNew) {
311                                is.values().remove(changed);
312                        }
313                        if (predicate==null || changed.extract(predicate)) {
314                                V key = extractor==null ? (V) changed.get() : changed.extract((Extractor<ST, V, S>) extractor);
315                                if (is.containsKey(key)) {
316                                        throw new StoreException("Violation of unique index: "+this);
317                                }
318                                is.put(key, changed);                           
319                        }
320                        granularity.set(is.size());
321                } else {
322                        Map<V, Collection<Handle<ST, PK, S>>> is = getIndexMultiStore();
323                        if (!isNew) {
324                                Iterator<Collection<Handle<ST, PK, S>>> vit = is.values().iterator();
325                                while (vit.hasNext()) {
326                                        Collection<Handle<ST, PK, S>> nextBucket = vit.next();
327                                        if (nextBucket.remove(changed)) {;
328                                                if (nextBucket.isEmpty()) {
329                                                        vit.remove();
330                                                }
331                                        }
332                                }
333                        }
334                        
335                        if (predicate==null || changed.extract(predicate)) {
336                                V key = extractor==null ? (V) changed.get() : changed.extract((Extractor<ST, V, S>) extractor);
337                                Collection<Handle<ST, PK, S>> bucket = is.get(key);
338                                if (bucket == null) {
339                                        bucket = new LinkedList<Store.Handle<ST,PK,S>>();
340                                        is.put(key, bucket);
341                                }
342                                bucket.add(changed);                            
343                        }                       
344                        
345                        granularity.set(getIndexMultiStore().size());                   
346                }
347                
348        }
349        
350        public int getGranularity() {
351                return granularity.get();
352        }
353        
354        /**
355         * Handles exceptions thrown in asynchronous applyUpdates().
356         * @param e
357         */
358        protected abstract void handleAsynchException(Exception e);
359        
360        /**
361         * Removes invalid handles from index store. This method
362         * operates under store's write lock, so it is not necessary
363         * to acquire own lock. 
364         */
365        // TODO - invoke from store cleanup asynchronously.
366        public void cleanup() {
367                if (isUnique()) {
368                        Iterator<Handle<ST, PK, S>> hit = getIndexUniqueStore().values().iterator();
369                        while (hit.hasNext()) {
370                                if (!hit.next().isValid()) {
371                                        hit.remove();
372                                }
373                        }
374                } else {
375                        Iterator<Collection<Handle<ST, PK, S>>> chit = getIndexMultiStore().values().iterator();
376                        while (chit.hasNext()) {
377                                Collection<Handle<ST, PK, S>> ch = chit.next();
378                                Iterator<Handle<ST, PK, S>> hit = ch.iterator();
379                                while (hit.hasNext()) {
380                                        if (!hit.next().isValid()) {
381                                                hit.remove();
382                                        }
383                                }
384                                if (ch.isEmpty()) {
385                                        chit.remove();
386                                }
387                        }
388                }
389        }
390        
391        private Iterable<ST> findMulti(V value) {
392                checkCorrupt();
393                applyUpdates();
394                try {
395                        if (isUnique()) { // For upgraded
396                                Handle<ST, PK, S> ret = getIndexUniqueStore().get(value);
397                                if (ret==null || !ret.isValid()) {
398                                        return Collections.emptyList();
399                                }
400                                return Collections.singleton(ret.get());
401                        }
402                        
403                        Collection<Handle<ST, PK, S>> bucket = getIndexMultiStore().get(value);
404                        if (bucket==null) {
405                                return Collections.emptyList();
406                        }
407                        Collection<ST> ret = new ArrayList<ST>();
408                        for (Handle<ST,PK,S> h: bucket) {
409                                if (h.isValid()) {
410                                        ret.add(h.get());
411                                }
412                        }
413                        return ret;
414                } finally {
415                        readLock().unlock();
416                }
417        }
418        
419        private ST findUnique(V value) {
420                checkCorrupt();
421                applyUpdates();
422                try {
423                        Handle<ST, PK, S> ret = getIndexUniqueStore().get(value);
424                        return ret!=null && ret.isValid() ? ret.get() : null;
425                } finally {
426                        readLock().unlock();
427                }
428        }
429        
430        @SuppressWarnings("unchecked")
431        public Iterable<T> find(V from, V to, boolean fromInclusive, boolean toInclusive) {
432                checkCorrupt();
433                applyUpdates();
434                try {
435                        Collection<T> ret = new LinkedList<T>();
436                        // Step 1 - key set to array.
437                        Collection<V> keys = isUnique() ? getIndexUniqueStore().keySet() : getIndexMultiStore().keySet();
438                        List<V> keyList = keys instanceof List ? (List<V>) keys : new ArrayList<V>(keys);
439                        Comparator<V> theComparator = getComparator()==null ? naturalComparator : getComparator();
440                        int fromIdx = Collections.binarySearch(keyList, from, theComparator);
441                        if (fromIdx<0) {
442                                // -Insertion point - 1
443                                // Inclusiveness doesn't matter
444                                fromIdx = -fromIdx - 1; 
445                        } else {
446                                if (!fromInclusive) {
447                                        ++fromIdx;
448                                }
449                        }
450                        
451                        int toIdx = Collections.binarySearch(keyList, to, theComparator);
452                        if (toIdx<0) {
453                                // -Insertion point - 1
454                                // Inclusiveness doesn't matter
455                                toIdx = -toIdx - 1;                             
456                        } else {
457                                if (!toInclusive) {
458                                        --toIdx;
459                                }
460                        }
461                        
462                        for (int i=fromIdx; i<=toIdx; ++i) {
463                                V currentKey = keyList.get(i);
464                                
465                                if (isUnique()) {
466                                        Handle<ST, PK, S> handle = getIndexUniqueStore().get(currentKey);
467                                        if (handle.isValid()) {
468                                                ret.add((T) handle.get());
469                                        }
470                                } else {
471                                        for (Handle<ST,PK,S> h: getIndexMultiStore().get(currentKey)) {
472                                                if (h.isValid()) {
473                                                        ret.add((T) h.get());
474                                                }
475                                        }
476                                }                               
477                        }
478                        return ret;
479                } finally {
480                        readLock().unlock();
481                }
482        }
483        
484        private InvocationHandler invocationHandler = new InvocationHandler() {
485                
486                @SuppressWarnings("unchecked")
487                @Override
488                public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
489                        if (method.getName().equals("find") && method.getParameterTypes().length==1) {
490                                if (UniqueIndex.class.equals(method.getDeclaringClass())) {
491                                        return findUnique((V) args[0]);
492                                } 
493                                return findMulti((V) args[0]);
494                        }
495                        return method.invoke(AbstractIndex.this, args);
496                }
497        };
498        
499        /**
500         * @return proxy which implements interfaces appropriate for the index type.
501         */
502        @SuppressWarnings("unchecked")
503        public Index<ST, T,PK,V,S> createProxy() {
504                if (isUnique()) {
505                        if (isOrdered()) {
506                                return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {OrderedIndex.class, UniqueIndex.class}, invocationHandler);
507                        } else {
508                                return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {UniqueIndex.class}, invocationHandler);                             
509                        }
510                } else {
511                        if (isOrdered()) {
512                                return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {OrderedIndex.class, MultiIndex.class}, invocationHandler);
513                        } else {
514                                return (Index<ST, T, PK, V, S>) Proxy.newProxyInstance(this.getClass().getClassLoader(), new Class[] {MultiIndex.class}, invocationHandler);                              
515                        }
516                }
517        }
518
519        @Override
520        public String toString() {
521                return "AbstractIndex [predicate=" + predicate + ", extractor="
522                                + extractor + ", type=" + type + ", ordered=" + ordered
523                                + ", comparator=" + comparator + ", isCorrupt=" + isCorrupt
524                                + "]";
525        }
526
527
528        
529}