001package com.hammurapi.common.concurrent;
002
003import java.util.ArrayList;
004import java.util.Collection;
005import java.util.concurrent.Callable;
006import java.util.concurrent.ExecutionException;
007import java.util.concurrent.FutureTask;
008
009/**
010 * Future task which cannot block in get() indefinitely. If it is not yet started,
011 * the task gets executed in the caller thread. 
012 * @author Pavel Vlasov
013 *
014 * @param <V>
015 */
016public class NonBlockingNotifyingFutureTask<V> extends FutureTask<V> implements NotifyingFuture<V> {
017        
018        private static class NotifyingCallable<T> implements Callable<T> {
019                
020                private Callable<T> master;
021                
022                Collection<CallableListener<T>> listeners = new ArrayList<CallableListener<T>>();
023
024                public NotifyingCallable(Callable<T> master) {
025                        this.master = master;
026                }
027
028                public T call() throws Exception {
029                        try {
030                                T ret = master.call();
031                                for (CallableListener<T> listener: copy()) {
032                                        listener.onCall(ret);
033                                }
034                                return ret;
035                        } catch (Exception e) {
036                                for (CallableListener<T> listener: copy()) {
037                                        listener.onException(e);
038                                }
039                                throw e;
040                        }
041                }
042                
043                private Collection<CallableListener<T>> copy() {
044                        synchronized (listeners) {
045                                return new ArrayList<CallableListener<T>>(listeners);
046                        }                       
047                }
048        }
049
050        private Collection<CallableListener<V>> listeners;
051        private TaskCounter taskCounter;
052
053        public NonBlockingNotifyingFutureTask(Callable<V> callable, TaskCounter taskCounter) {
054                super(callable = new NotifyingCallable<V>(callable));     
055                this.listeners = ((NotifyingCallable<V>) callable).listeners;
056                if (taskCounter!=null) {
057                        taskCounter.onTaskCreated(this);
058                }
059                this.taskCounter = taskCounter;
060        }
061        
062        @Override
063        public void run() {
064                try {
065                        super.run();
066                } finally {
067                        if (taskCounter!=null) {
068                                taskCounter.onTaskFinished(this);
069                        }
070                }
071        }
072        
073        @Override
074        public V get() throws InterruptedException, ExecutionException {
075                if (!isDone()) {
076                        run();
077                }
078                return super.get();
079        }
080
081        public void addListener(CallableListener<V> listener) {
082                synchronized (listeners) {
083                        listeners.add(listener);
084                }               
085        }
086
087        public void removeListener(CallableListener<V> listener) {
088                synchronized (listeners) {
089                        listeners.remove(listener);
090                }
091        }
092        
093}