001    package com.hammurapi.util.concurrent;
002    
003    import java.util.ArrayList;
004    import java.util.Collection;
005    import java.util.concurrent.Callable;
006    import java.util.concurrent.ExecutionException;
007    import java.util.concurrent.FutureTask;
008    
009    /**
010     * Future task which cannot block in get() indefinitely. If it is not yet started,
011     * the task gets executed in the caller thread. 
012     * @author Pavel Vlasov
013     *
014     * @param <V>
015     */
016    public class NonBlockingNotifyingFutureTask<V> extends FutureTask<V> implements NotifyingFuture<V> {
017            
018            private static class NotifyingCallable<T> implements Callable<T> {
019                    
020                    private Callable<T> master;
021                    
022                    Collection<CallableListener<T>> listeners = new ArrayList<CallableListener<T>>();
023    
024                    public NotifyingCallable(Callable<T> master) {
025                            this.master = master;
026                    }
027    
028                    public T call() throws Exception {
029                            try {
030                                    T ret = master.call();
031                                    for (CallableListener<T> listener: copy()) {
032                                            listener.onCall(ret);
033                                    }
034                                    return ret;
035                            } catch (Exception e) {
036                                    for (CallableListener<T> listener: copy()) {
037                                            listener.onException(e);
038                                    }
039                                    throw e;
040                            }
041                    }
042                    
043                    private Collection<CallableListener<T>> copy() {
044                            synchronized (listeners) {
045                                    return new ArrayList<CallableListener<T>>(listeners);
046                            }                       
047                    }
048            }
049    
050            private Collection<CallableListener<V>> listeners;
051            private TaskCounter taskCounter;
052    
053            public NonBlockingNotifyingFutureTask(Callable<V> callable, TaskCounter taskCounter) {
054                    super(callable = new NotifyingCallable<V>(callable));     
055                    this.listeners = ((NotifyingCallable<V>) callable).listeners;
056                    if (taskCounter!=null) {
057                            taskCounter.onTaskCreated(this);
058                    }
059                    this.taskCounter = taskCounter;
060            }
061            
062            @Override
063            public void run() {
064                    try {
065                            super.run();
066                    } finally {
067                            if (taskCounter!=null) {
068                                    taskCounter.onTaskFinished(this);
069                            }
070                    }
071            }
072            
073            @Override
074            public V get() throws InterruptedException, ExecutionException {
075                    if (!isDone()) {
076                            run();
077                    }
078                    return super.get();
079            }
080    
081            public void addListener(CallableListener<V> listener) {
082                    synchronized (listeners) {
083                            listeners.add(listener);
084                    }               
085            }
086    
087            public void removeListener(CallableListener<V> listener) {
088                    synchronized (listeners) {
089                            listeners.remove(listener);
090                    }
091            }
092            
093    }