1 | package com.hammurapi.common.concurrent; |
2 | |
3 | import java.util.ArrayList; |
4 | import java.util.Collection; |
5 | import java.util.concurrent.Callable; |
6 | import java.util.concurrent.ExecutionException; |
7 | import java.util.concurrent.FutureTask; |
8 | |
9 | /** |
10 | * Future task which cannot block in get() indefinitely. If it is not yet started, |
11 | * the task gets executed in the caller thread. |
12 | * @author Pavel Vlasov |
13 | * |
14 | * @param <V> |
15 | */ |
16 | public class NonBlockingNotifyingFutureTask<V> extends FutureTask<V> implements NotifyingFuture<V> { |
17 | |
18 | private static class NotifyingCallable<T> implements Callable<T> { |
19 | |
20 | private Callable<T> master; |
21 | |
22 | Collection<CallableListener<T>> listeners = new ArrayList<CallableListener<T>>(); |
23 | |
24 | public NotifyingCallable(Callable<T> master) { |
25 | this.master = master; |
26 | } |
27 | |
28 | public T call() throws Exception { |
29 | try { |
30 | T ret = master.call(); |
31 | for (CallableListener<T> listener: copy()) { |
32 | listener.onCall(ret); |
33 | } |
34 | return ret; |
35 | } catch (Exception e) { |
36 | for (CallableListener<T> listener: copy()) { |
37 | listener.onException(e); |
38 | } |
39 | throw e; |
40 | } |
41 | } |
42 | |
43 | private Collection<CallableListener<T>> copy() { |
44 | synchronized (listeners) { |
45 | return new ArrayList<CallableListener<T>>(listeners); |
46 | } |
47 | } |
48 | } |
49 | |
50 | private Collection<CallableListener<V>> listeners; |
51 | private TaskCounter taskCounter; |
52 | |
53 | public NonBlockingNotifyingFutureTask(Callable<V> callable, TaskCounter taskCounter) { |
54 | super(callable = new NotifyingCallable<V>(callable)); |
55 | this.listeners = ((NotifyingCallable<V>) callable).listeners; |
56 | if (taskCounter!=null) { |
57 | taskCounter.onTaskCreated(this); |
58 | } |
59 | this.taskCounter = taskCounter; |
60 | } |
61 | |
62 | @Override |
63 | public void run() { |
64 | try { |
65 | super.run(); |
66 | } finally { |
67 | if (taskCounter!=null) { |
68 | taskCounter.onTaskFinished(this); |
69 | } |
70 | } |
71 | } |
72 | |
73 | @Override |
74 | public V get() throws InterruptedException, ExecutionException { |
75 | if (!isDone()) { |
76 | run(); |
77 | } |
78 | return super.get(); |
79 | } |
80 | |
81 | public void addListener(CallableListener<V> listener) { |
82 | synchronized (listeners) { |
83 | listeners.add(listener); |
84 | } |
85 | } |
86 | |
87 | public void removeListener(CallableListener<V> listener) { |
88 | synchronized (listeners) { |
89 | listeners.remove(listener); |
90 | } |
91 | } |
92 | |
93 | } |