001 package com.hammurapi.util.concurrent;
002
003 import java.util.ArrayList;
004 import java.util.Collection;
005 import java.util.concurrent.Callable;
006 import java.util.concurrent.ExecutionException;
007 import java.util.concurrent.FutureTask;
008
009 /**
010 * Future task which cannot block in get() indefinitely. If it is not yet started,
011 * the task gets executed in the caller thread.
012 * @author Pavel Vlasov
013 *
014 * @param <V>
015 */
016 public class NonBlockingNotifyingFutureTask<V> extends FutureTask<V> implements NotifyingFuture<V> {
017
018 private static class NotifyingCallable<T> implements Callable<T> {
019
020 private Callable<T> master;
021
022 Collection<CallableListener<T>> listeners = new ArrayList<CallableListener<T>>();
023
024 public NotifyingCallable(Callable<T> master) {
025 this.master = master;
026 }
027
028 public T call() throws Exception {
029 try {
030 T ret = master.call();
031 for (CallableListener<T> listener: copy()) {
032 listener.onCall(ret);
033 }
034 return ret;
035 } catch (Exception e) {
036 for (CallableListener<T> listener: copy()) {
037 listener.onException(e);
038 }
039 throw e;
040 }
041 }
042
043 private Collection<CallableListener<T>> copy() {
044 synchronized (listeners) {
045 return new ArrayList<CallableListener<T>>(listeners);
046 }
047 }
048 }
049
050 private Collection<CallableListener<V>> listeners;
051 private TaskCounter taskCounter;
052
053 public NonBlockingNotifyingFutureTask(Callable<V> callable, TaskCounter taskCounter) {
054 super(callable = new NotifyingCallable<V>(callable));
055 this.listeners = ((NotifyingCallable<V>) callable).listeners;
056 if (taskCounter!=null) {
057 taskCounter.onTaskCreated(this);
058 }
059 this.taskCounter = taskCounter;
060 }
061
062 @Override
063 public void run() {
064 try {
065 super.run();
066 } finally {
067 if (taskCounter!=null) {
068 taskCounter.onTaskFinished(this);
069 }
070 }
071 }
072
073 @Override
074 public V get() throws InterruptedException, ExecutionException {
075 if (!isDone()) {
076 run();
077 }
078 return super.get();
079 }
080
081 public void addListener(CallableListener<V> listener) {
082 synchronized (listeners) {
083 listeners.add(listener);
084 }
085 }
086
087 public void removeListener(CallableListener<V> listener) {
088 synchronized (listeners) {
089 listeners.remove(listener);
090 }
091 }
092
093 }