1 | package com.hammurapi.common.concurrent; |
2 | |
3 | import java.util.LinkedList; |
4 | import java.util.List; |
5 | import java.util.concurrent.AbstractExecutorService; |
6 | import java.util.concurrent.ExecutorService; |
7 | import java.util.concurrent.TimeUnit; |
8 | import java.util.concurrent.atomic.AtomicLong; |
9 | |
10 | /** |
11 | * In-JMV tracking executor service. |
12 | * @author Pavel Vlasov |
13 | * |
14 | */ |
15 | public class LocalTrackingExecutorService extends AbstractExecutorService implements TrackingExecutorService { |
16 | |
17 | private boolean oneOff; |
18 | |
19 | private AtomicLong joinDone = new AtomicLong(-1); |
20 | |
21 | private final AtomicLong TASK_COUNTER = new AtomicLong(); |
22 | |
23 | private String name; |
24 | |
25 | private final ThreadLocal<TrackingRunnable> contextTask = new ThreadLocal<TrackingRunnable>(); |
26 | |
27 | /** |
28 | * @param master Master executor. |
29 | * @param oneOff If true, join can be invoked only once. No tasks can be submitted to the executor once |
30 | * join() returns. |
31 | */ |
32 | public LocalTrackingExecutorService(ExecutorService master, boolean oneOff, String name) { |
33 | this.master = master; |
34 | this.oneOff = oneOff; |
35 | this.name = name; |
36 | } |
37 | |
38 | @Override |
39 | public void shutdown() { |
40 | master.shutdown(); |
41 | } |
42 | |
43 | @Override |
44 | public List<Runnable> shutdownNow() { |
45 | return master.shutdownNow(); |
46 | } |
47 | |
48 | @Override |
49 | public boolean isShutdown() { |
50 | return master.isShutdown(); |
51 | } |
52 | |
53 | @Override |
54 | public boolean isTerminated() { |
55 | return master.isTerminated(); |
56 | } |
57 | |
58 | @Override |
59 | public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { |
60 | return master.awaitTermination(timeout, unit); |
61 | } |
62 | |
63 | @Override |
64 | public void execute(Runnable command) { |
65 | checkJoinDone(); |
66 | master.execute(new TrackingRunnable(command)); |
67 | } |
68 | |
69 | private void checkJoinDone() { |
70 | if (oneOff && joinDone.get()!=-1) { |
71 | throw new IllegalStateException("Can't execute any more tasks"); |
72 | } |
73 | } |
74 | |
75 | // private Lock serviceLock = new ReentrantLock(); |
76 | |
77 | private Thread joinThread; |
78 | |
79 | /** |
80 | * Executes and removes pending tasks, exits when all tasks are completed. |
81 | */ |
82 | @Override |
83 | public void join() throws InterruptedException { |
84 | join(0); |
85 | } |
86 | |
87 | public boolean join(long timeout) throws InterruptedException { |
88 | long start = timeout>0 ? System.currentTimeMillis() : 0; |
89 | checkJoinDone(); |
90 | joinThread = Thread.currentThread(); |
91 | while (true) { |
92 | TrackingRunnable toRun = null; |
93 | boolean hasRunning = false; |
94 | synchronized (tasks) { |
95 | for (TrackingRunnable task: tasks) { |
96 | if (task.state==TaskState.PENDING) { |
97 | toRun = task; |
98 | break; |
99 | } else if (task.state==TaskState.RUNNING && task.runner!=Thread.currentThread()) { |
100 | hasRunning = true; |
101 | } |
102 | } |
103 | if (toRun==null) { |
104 | if (!hasRunning) { |
105 | if (oneOff) { |
106 | joinDone.set(TASK_COUNTER.get()); |
107 | } |
108 | return true; |
109 | } |
110 | tasks.wait(10); // Wakes up if all tasks finish execution. |
111 | if (start>0) { |
112 | if (System.currentTimeMillis()-start>timeout) { |
113 | return false; |
114 | } |
115 | } |
116 | } |
117 | } |
118 | |
119 | if (toRun!=null) { |
120 | toRun.run(); |
121 | } |
122 | } |
123 | } |
124 | |
125 | private ExecutorService master; |
126 | |
127 | // private Collection<Thread> runners = new LinkedList<Thread>(); |
128 | |
129 | // Tasks which are not yet being executed. |
130 | private LinkedList<TrackingRunnable> tasks = new LinkedList<TrackingRunnable>(); |
131 | |
132 | private enum TaskState { |
133 | PENDING, |
134 | RUNNING, |
135 | DONE |
136 | } |
137 | |
138 | private class TrackingRunnable implements Runnable { |
139 | |
140 | volatile TaskState state; |
141 | volatile Thread runner; |
142 | |
143 | private Runnable master; |
144 | private final long taskID = TASK_COUNTER.incrementAndGet(); |
145 | private TrackingRunnable submitter = contextTask.get(); |
146 | |
147 | TrackingRunnable(Runnable master) { |
148 | this.master = master; |
149 | synchronized (tasks) { |
150 | state = TaskState.PENDING; |
151 | tasks.add(this); |
152 | } |
153 | } |
154 | |
155 | @Override |
156 | public void run() { |
157 | synchronized (tasks) { |
158 | if (state!=TaskState.PENDING) { |
159 | return; |
160 | } |
161 | runner = Thread.currentThread(); |
162 | state = TaskState.RUNNING; |
163 | contextTask.set(this); |
164 | } |
165 | try { |
166 | master.run(); |
167 | } finally { |
168 | synchronized (tasks) { |
169 | runner = null; |
170 | state = TaskState.DONE; |
171 | contextTask.set(null); |
172 | tasks.remove(this); |
173 | boolean hasIncompleteTasks = false; |
174 | for (TrackingRunnable task: tasks) { |
175 | if (task.state!=TaskState.DONE) { |
176 | hasIncompleteTasks = true; |
177 | break; |
178 | } |
179 | } |
180 | if (!hasIncompleteTasks) { |
181 | tasks.notifyAll(); |
182 | } |
183 | } |
184 | } |
185 | } |
186 | } |
187 | |
188 | } |