001package com.hammurapi.common.concurrent; 002 003import java.util.ArrayList; 004import java.util.Collection; 005import java.util.concurrent.Callable; 006import java.util.concurrent.ExecutionException; 007import java.util.concurrent.FutureTask; 008 009/** 010 * Future task which cannot block in get() indefinitely. If it is not yet started, 011 * the task gets executed in the caller thread. 012 * @author Pavel Vlasov 013 * 014 * @param <V> 015 */ 016public class NonBlockingNotifyingFutureTask<V> extends FutureTask<V> implements NotifyingFuture<V> { 017 018 private static class NotifyingCallable<T> implements Callable<T> { 019 020 private Callable<T> master; 021 022 Collection<CallableListener<T>> listeners = new ArrayList<CallableListener<T>>(); 023 024 public NotifyingCallable(Callable<T> master) { 025 this.master = master; 026 } 027 028 public T call() throws Exception { 029 try { 030 T ret = master.call(); 031 for (CallableListener<T> listener: copy()) { 032 listener.onCall(ret); 033 } 034 return ret; 035 } catch (Exception e) { 036 for (CallableListener<T> listener: copy()) { 037 listener.onException(e); 038 } 039 throw e; 040 } 041 } 042 043 private Collection<CallableListener<T>> copy() { 044 synchronized (listeners) { 045 return new ArrayList<CallableListener<T>>(listeners); 046 } 047 } 048 } 049 050 private Collection<CallableListener<V>> listeners; 051 private TaskCounter taskCounter; 052 053 public NonBlockingNotifyingFutureTask(Callable<V> callable, TaskCounter taskCounter) { 054 super(callable = new NotifyingCallable<V>(callable)); 055 this.listeners = ((NotifyingCallable<V>) callable).listeners; 056 if (taskCounter!=null) { 057 taskCounter.onTaskCreated(this); 058 } 059 this.taskCounter = taskCounter; 060 } 061 062 @Override 063 public void run() { 064 try { 065 super.run(); 066 } finally { 067 if (taskCounter!=null) { 068 taskCounter.onTaskFinished(this); 069 } 070 } 071 } 072 073 @Override 074 public V get() throws InterruptedException, ExecutionException { 075 if (!isDone()) { 076 run(); 077 } 078 return super.get(); 079 } 080 081 public void addListener(CallableListener<V> listener) { 082 synchronized (listeners) { 083 listeners.add(listener); 084 } 085 } 086 087 public void removeListener(CallableListener<V> listener) { 088 synchronized (listeners) { 089 listeners.remove(listener); 090 } 091 } 092 093}