001package com.hammurapi.common.concurrent; 002 003import java.util.LinkedList; 004import java.util.List; 005import java.util.concurrent.AbstractExecutorService; 006import java.util.concurrent.Callable; 007import java.util.concurrent.ExecutorService; 008import java.util.concurrent.RunnableFuture; 009import java.util.concurrent.TimeUnit; 010import java.util.concurrent.atomic.AtomicLong; 011 012import com.hammurapi.common.HammurapiException; 013 014/** 015 * In-JMV tracking executor service. 016 * @author Pavel Vlasov 017 * 018 */ 019public class LocalTrackingExecutorService extends AbstractExecutorService implements TrackingExecutorService { 020 021 private boolean oneOff; 022 023 private AtomicLong joinDone = new AtomicLong(-1); 024 025 private final AtomicLong TASK_COUNTER = new AtomicLong(); 026 027 private String name; 028 029// private final ThreadLocal<Tracking> contextTask = new ThreadLocal<Tracking>(); 030 031 /** 032 * @param master Master executor. 033 * @param oneOff If true, join can be invoked only once. No tasks can be submitted to the executor once 034 * join() returns. 035 */ 036 public LocalTrackingExecutorService(ExecutorService master, boolean oneOff, String name) { 037 this.master = master; 038 this.oneOff = oneOff; 039 this.name = name; 040 } 041 042 @Override 043 public void shutdown() { 044 master.shutdown(); 045 } 046 047 @Override 048 public List<Runnable> shutdownNow() { 049 return master.shutdownNow(); 050 } 051 052 @Override 053 public boolean isShutdown() { 054 return master.isShutdown(); 055 } 056 057 @Override 058 public boolean isTerminated() { 059 return master.isTerminated(); 060 } 061 062 @Override 063 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 064 return master.awaitTermination(timeout, unit); 065 } 066 067 @Override 068 public void execute(Runnable command) { 069 checkJoinDone(); 070 if (command instanceof DelegatingRunnableFuture) { 071 DelegatingRunnableFuture<Object> drf = (DelegatingRunnableFuture<Object>) command; 072 if (drf.getCommand()!=null) { 073 drf.setTarget(master.submit(new TrackingRunnable(drf.getCommand()), drf.getValue())); 074 } else { 075 drf.setTarget(master.submit((Callable) new TrackingCallable(drf.getTask()))); 076 } 077 } else { 078 master.execute(new TrackingRunnable(command)); 079 } 080 } 081 082 @Override 083 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 084 return new DelegatingRunnableFuture<T>(callable); 085 } 086 087 protected <T extends Object> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 088 return new DelegatingRunnableFuture<T>(runnable, value); 089 }; 090 091 private void checkJoinDone() { 092 if (oneOff && joinDone.get()!=-1) { 093 throw new IllegalStateException("Can't execute any more tasks"); 094 } 095 } 096 097// private Lock serviceLock = new ReentrantLock(); 098 099 private Thread joinThread; 100 101 /** 102 * Executes and removes pending tasks, exits when all tasks are completed. 103 */ 104 @Override 105 public void join() throws InterruptedException { 106 join(0); 107 } 108 109 public boolean join(long timeout) throws InterruptedException { 110 long start = timeout>0 ? System.currentTimeMillis() : 0; 111 checkJoinDone(); 112 joinThread = Thread.currentThread(); 113 while (true) { 114 Runnable toRun = null; 115 boolean hasRunning = false; 116 synchronized (tasks) { 117 for (Tracking task: tasks) { 118 if (task.state==TaskState.PENDING) { 119 toRun = task; 120 break; 121 } else if (task.state==TaskState.RUNNING && task.runner!=Thread.currentThread()) { 122 hasRunning = true; 123 } 124 } 125 if (toRun==null) { 126 if (!hasRunning) { 127 if (oneOff) { 128 joinDone.set(TASK_COUNTER.get()); 129 } 130 return true; 131 } 132 tasks.wait(10); // Wakes up if all tasks finish execution. 133 if (start>0) { 134 if (System.currentTimeMillis()-start>timeout) { 135 return false; 136 } 137 } 138 } 139 } 140 141 if (toRun!=null) { 142 toRun.run(); 143 } 144 } 145 } 146 147 private ExecutorService master; 148 149// private Collection<Thread> runners = new LinkedList<Thread>(); 150 151 // Tasks which are not yet being executed. 152 private LinkedList<Tracking> tasks = new LinkedList<Tracking>(); 153 154 private enum TaskState { 155 PENDING, 156 RUNNING, 157 DONE 158 } 159 160 private abstract class Tracking implements Runnable { 161 volatile TaskState state; 162 volatile Thread runner; 163 } 164 165 private class TrackingRunnable extends Tracking { 166 167 private Runnable master; 168// private final long taskID = TASK_COUNTER.incrementAndGet(); 169// private Tracking submitter = contextTask.get(); 170 171 TrackingRunnable(Runnable master) { 172 this.master = master; 173 synchronized (tasks) { 174 state = TaskState.PENDING; 175 tasks.add(this); 176 } 177 } 178 179 @Override 180 public void run() { 181 synchronized (tasks) { 182 if (state!=TaskState.PENDING) { 183 // Already running. 184 return; 185 } 186 runner = Thread.currentThread(); 187 state = TaskState.RUNNING; 188// contextTask.set(this); 189 } 190 try { 191 master.run(); 192 } finally { 193 synchronized (tasks) { 194 runner = null; 195 state = TaskState.DONE; 196// contextTask.set(null); 197 tasks.remove(this); 198 boolean hasIncompleteTasks = false; 199 for (Tracking task: tasks) { 200 if (task.state!=TaskState.DONE) { 201 hasIncompleteTasks = true; 202 break; 203 } 204 } 205 if (!hasIncompleteTasks) { 206 tasks.notifyAll(); 207 } 208 } 209 } 210 } 211 } 212 213 private class TrackingCallable<V> extends Tracking implements Callable<V> { 214 215 volatile TaskState state; 216 volatile Thread runner; 217 218 private Callable<V> master; 219// private final long taskID = TASK_COUNTER.incrementAndGet(); 220// private TrackingRunnable submitter = contextTask.get(); 221 222 TrackingCallable(Callable<V> master) { 223 this.master = master; 224 synchronized (tasks) { 225 state = TaskState.PENDING; 226 tasks.add(this); 227 } 228 } 229 230 private Object[] result = {null}; 231 232 @Override 233 public V call() throws Exception { 234 synchronized (tasks) { 235 if (state==TaskState.DONE) { 236 return (V) result[0]; 237 } 238 if (state==TaskState.RUNNING) { 239 if (runner == Thread.currentThread()) { 240 throw new IllegalStateException(); 241 } 242 synchronized (result) { 243 result.wait(); 244 return (V) result[0]; 245 } 246 } 247 runner = Thread.currentThread(); 248 state = TaskState.RUNNING; 249// contextTask.set(this); 250 } 251 try { 252 result[0] = master.call(); 253 synchronized (result) { 254 result.notifyAll(); 255 } 256 return (V) result[0]; 257 } finally { 258 synchronized (tasks) { 259 runner = null; 260 state = TaskState.DONE; 261// contextTask.set(null); 262 tasks.remove(this); 263 boolean hasIncompleteTasks = false; 264 for (Tracking task: tasks) { 265 if (task.state!=TaskState.DONE) { 266 hasIncompleteTasks = true; 267 break; 268 } 269 } 270 if (!hasIncompleteTasks) { 271 tasks.notifyAll(); 272 } 273 } 274 } 275 } 276 277 @Override 278 public void run() { 279 try { 280 call(); 281 } catch (Exception e) { 282 throw new HammurapiException(e); 283 } 284 } 285 } 286 287}