001package com.hammurapi.common.concurrent;
002
003import java.util.LinkedList;
004import java.util.List;
005import java.util.concurrent.AbstractExecutorService;
006import java.util.concurrent.Callable;
007import java.util.concurrent.ExecutorService;
008import java.util.concurrent.RunnableFuture;
009import java.util.concurrent.TimeUnit;
010import java.util.concurrent.atomic.AtomicLong;
011
012import com.hammurapi.common.HammurapiException;
013
014/**
015 * In-JMV tracking executor service.
016 * @author Pavel Vlasov
017 *
018 */
019public class LocalTrackingExecutorService extends AbstractExecutorService implements TrackingExecutorService {
020        
021        private boolean oneOff;
022        
023        private AtomicLong joinDone = new AtomicLong(-1);
024        
025        private final AtomicLong TASK_COUNTER = new AtomicLong();
026
027        private String name;
028        
029//      private final ThreadLocal<Tracking> contextTask = new ThreadLocal<Tracking>(); 
030                
031        /**
032         * @param master Master executor.
033         * @param oneOff If true, join can be invoked only once. No tasks can be submitted to the executor once
034         * join() returns.
035         */
036        public LocalTrackingExecutorService(ExecutorService master, boolean oneOff, String name) {
037                this.master = master;
038                this.oneOff = oneOff;
039                this.name = name;
040        }
041
042        @Override
043        public void shutdown() {
044                master.shutdown();
045        }
046
047        @Override
048        public List<Runnable> shutdownNow() {
049                return master.shutdownNow();
050        }
051
052        @Override
053        public boolean isShutdown() {
054                return master.isShutdown();
055        }
056
057        @Override
058        public boolean isTerminated() {
059                return master.isTerminated();
060        }
061
062        @Override
063        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
064                return master.awaitTermination(timeout, unit);
065        }
066
067        @Override
068        public void execute(Runnable command) {
069                checkJoinDone();                
070                if (command instanceof DelegatingRunnableFuture) {
071                        DelegatingRunnableFuture<Object> drf = (DelegatingRunnableFuture<Object>) command;
072                        if (drf.getCommand()!=null) {
073                                drf.setTarget(master.submit(new TrackingRunnable(drf.getCommand()), drf.getValue()));
074                        } else {
075                                drf.setTarget(master.submit((Callable) new TrackingCallable(drf.getTask())));
076                        }
077                } else {
078                        master.execute(new TrackingRunnable(command));
079                }
080        }
081        
082        @Override
083        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
084                return new DelegatingRunnableFuture<T>(callable);
085        }
086        
087        protected <T extends Object> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
088                return new DelegatingRunnableFuture<T>(runnable, value);          
089        };
090
091        private void checkJoinDone() {
092                if (oneOff && joinDone.get()!=-1) {
093                        throw new IllegalStateException("Can't execute any more tasks");
094                }
095        }
096        
097//      private Lock serviceLock = new ReentrantLock();
098        
099        private Thread joinThread;
100
101        /**
102         * Executes and removes pending tasks, exits when all tasks are completed.
103         */
104        @Override
105        public void join() throws InterruptedException {
106                join(0);
107        }
108        
109        public boolean join(long timeout) throws InterruptedException {
110                long start = timeout>0 ? System.currentTimeMillis() : 0;
111                checkJoinDone();                
112                joinThread = Thread.currentThread();
113                while (true) {
114                        Runnable toRun = null;
115                        boolean hasRunning = false;
116                        synchronized (tasks) {
117                                for (Tracking task: tasks) {
118                                        if (task.state==TaskState.PENDING) {
119                                                toRun = task;
120                                                break;
121                                        } else if (task.state==TaskState.RUNNING && task.runner!=Thread.currentThread()) {
122                                                hasRunning = true;
123                                        }
124                                }
125                                if (toRun==null) {
126                                        if (!hasRunning) {
127                                                if (oneOff) {
128                                                        joinDone.set(TASK_COUNTER.get());
129                                                }
130                                                return true;
131                                        }
132                                        tasks.wait(10); // Wakes up if all tasks finish execution.
133                                        if (start>0) {
134                                                if (System.currentTimeMillis()-start>timeout) {
135                                                        return false;
136                                                }
137                                        }
138                                }
139                        }
140                        
141                        if (toRun!=null) {
142                                toRun.run();                    
143                        }
144                }
145        }
146        
147        private ExecutorService master; 
148        
149//      private Collection<Thread> runners = new LinkedList<Thread>();
150        
151        // Tasks which are not yet being executed.
152        private LinkedList<Tracking> tasks = new LinkedList<Tracking>();
153        
154        private enum TaskState {
155                PENDING,
156                RUNNING,
157                DONE
158        }
159        
160        private abstract class Tracking implements Runnable {
161                volatile TaskState state;
162                volatile Thread runner;                         
163        }
164        
165        private class TrackingRunnable extends Tracking {
166                
167                private Runnable master;
168//              private final long taskID = TASK_COUNTER.incrementAndGet(); 
169//              private Tracking submitter = contextTask.get();
170                
171                TrackingRunnable(Runnable master) {                     
172                        this.master = master;
173                        synchronized (tasks) {
174                                state = TaskState.PENDING;
175                                tasks.add(this);
176                        }
177                }
178                
179                @Override
180                public void run() {
181                        synchronized (tasks) {
182                                if (state!=TaskState.PENDING) {
183                                        // Already running.
184                                        return;
185                                }
186                                runner = Thread.currentThread();
187                                state = TaskState.RUNNING;
188//                              contextTask.set(this);
189                        }
190                        try {
191                                master.run();
192                        } finally {
193                                synchronized (tasks) {
194                                        runner = null;
195                                        state = TaskState.DONE;
196//                                      contextTask.set(null);
197                                        tasks.remove(this);
198                                        boolean hasIncompleteTasks = false;
199                                        for (Tracking task: tasks) {
200                                                if (task.state!=TaskState.DONE) {
201                                                        hasIncompleteTasks = true;
202                                                        break;
203                                                }
204                                        }
205                                        if (!hasIncompleteTasks) {
206                                                tasks.notifyAll();
207                                        }
208                                }
209                        }
210                }
211        }
212
213        private class TrackingCallable<V> extends Tracking implements Callable<V> {
214                
215                volatile TaskState state;
216                volatile Thread runner;
217                
218                private Callable<V> master;
219//              private final long taskID = TASK_COUNTER.incrementAndGet(); 
220//              private TrackingRunnable submitter = contextTask.get();
221                
222                TrackingCallable(Callable<V> master) {                    
223                        this.master = master;
224                        synchronized (tasks) {
225                                state = TaskState.PENDING;
226                                tasks.add(this);
227                        }
228                }
229                
230                private Object[] result = {null};
231                
232                @Override
233                public V call() throws Exception {
234                        synchronized (tasks) {
235                                if (state==TaskState.DONE) {
236                                        return (V) result[0];
237                                }
238                                if (state==TaskState.RUNNING) {
239                                        if (runner == Thread.currentThread()) {
240                                                throw new IllegalStateException();
241                                        }
242                                        synchronized (result) {
243                                                result.wait();
244                                                return (V) result[0];
245                                        }
246                                }
247                                runner = Thread.currentThread();
248                                state = TaskState.RUNNING;
249//                              contextTask.set(this);
250                        }
251                        try {
252                                result[0] = master.call();
253                                synchronized (result) {
254                                        result.notifyAll();
255                                }
256                                return (V) result[0];
257                        } finally {
258                                synchronized (tasks) {
259                                        runner = null;
260                                        state = TaskState.DONE;
261//                                      contextTask.set(null);
262                                        tasks.remove(this);
263                                        boolean hasIncompleteTasks = false;
264                                        for (Tracking task: tasks) {
265                                                if (task.state!=TaskState.DONE) {
266                                                        hasIncompleteTasks = true;
267                                                        break;
268                                                }
269                                        }
270                                        if (!hasIncompleteTasks) {
271                                                tasks.notifyAll();
272                                        }
273                                }
274                        }
275                }
276
277                @Override
278                public void run() {
279                        try {
280                                call();
281                        } catch (Exception e) {
282                                throw new HammurapiException(e); 
283                        }                       
284                }
285        }
286        
287}