001package com.hammurapi.common.concurrent;
002
003import java.util.Collections;
004import java.util.List;
005import java.util.concurrent.AbstractExecutorService;
006import java.util.concurrent.Callable;
007import java.util.concurrent.FutureTask;
008import java.util.concurrent.RunnableFuture;
009import java.util.concurrent.ThreadFactory;
010import java.util.concurrent.TimeUnit;
011import java.util.concurrent.atomic.AtomicBoolean;
012import java.util.logging.Level;
013import java.util.logging.Logger;
014
015/**
016 * This class does not queue jobs - it either executes them immediately in a background thread
017 * or in a caller thread if all background threads are busy.
018 * @author Pavel Vlasov
019 *
020 */
021public class NonQueueingThreadPoolExecutorService extends AbstractExecutorService {
022        
023        private AtomicBoolean shutdown = new AtomicBoolean(); 
024        
025        private class NonQueueingRunnable implements Runnable {
026                
027                private int idx;
028
029                public NonQueueingRunnable(int idx) {
030                        this.idx = idx;
031                }
032                
033                Runnable[] toRun = {null};
034
035                @Override
036                public void run() {
037                        try {
038                                while (true) {
039                                        synchronized (toRun) {
040                                                while (toRun[0]==null) {
041                                                        if (shutdown.get()) {
042                                                                toRun.notifyAll();
043                                                                return;
044                                                        }
045                                                        try {
046                                                                toRun.wait();
047                                                        } catch (InterruptedException e) {
048                                                                handleInterruptedException(e);
049                                                        }
050                                                }
051                                        }
052                                        try {
053                                                toRun[0].run();
054                                        } catch (Exception e) {
055                                                handleException(toRun[0], e);
056                                        }
057                                        synchronized (toRun) {
058                                                toRun[0] = null;
059                                        }
060                                }
061                        } finally {
062                                runnables[idx] = null;
063                        }
064                }
065
066                boolean offer(Runnable command) {
067                        synchronized (toRun) {
068                                if (this.toRun[0]==null) {
069                                        this.toRun[0]=command;
070                                        this.toRun.notifyAll();
071                                        return true;
072                                }
073                                return false;
074                        }
075                }
076                
077                void shutdown() {
078                        synchronized (toRun) {
079                                toRun.notifyAll();
080                        }
081                }
082
083                public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
084                        synchronized (toRun) {
085                                if (toRun[0]!=null) {                                   
086                                        if (timeout==0) {
087                                                toRun.wait();
088                                        } else {
089                                                toRun.wait(unit.toMillis(timeout));
090                                        }
091                                        if (toRun[0]!=null) {
092                                                return false;
093                                        }
094                                }
095                        }
096                        return true;
097                }
098        }
099        
100        private NonQueueingRunnable[] runnables;
101        
102        public NonQueueingThreadPoolExecutorService(ThreadFactory threadFactory, int size) {
103                runnables = new NonQueueingRunnable[size];
104                for (int i=0; i<runnables.length; ++i) {
105                        runnables[i] = new NonQueueingRunnable(i);
106                        Thread thread = threadFactory.newThread(runnables[i]);
107                        if (thread == null) {
108                                runnables[i] = null;
109                        } else {
110                                thread.start();
111                        }
112                }
113        }
114
115        @Override
116        public void shutdown() {                
117                shutdown.set(true);
118                for (NonQueueingRunnable nqr: runnables) {
119                        if (nqr!=null) {
120                                nqr.shutdown();
121                        }
122                }
123        }
124
125        @Override
126        public List<Runnable> shutdownNow() {
127                shutdown();
128                return Collections.emptyList();
129        }
130
131        @Override
132        public boolean isShutdown() {
133                return shutdown.get();
134        }
135
136        @Override
137        public boolean isTerminated() {
138                for (NonQueueingRunnable nqr: runnables) {
139                        synchronized (nqr.toRun) {
140                                if (nqr.toRun[0]!=null) {
141                                        return false;
142                                }
143                        }
144                }
145                return false;
146        }
147
148        @Override
149        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
150                for (NonQueueingRunnable nqr: runnables) {
151                        if (nqr!=null && !nqr.awaitTermination(timeout, unit)) {
152                                return false;
153                        }
154                }
155                return true;
156        }
157
158        @Override
159        public void execute(Runnable command) {
160                for (NonQueueingRunnable nqr: runnables) {
161                        if (nqr.offer(command)) {
162                                return;
163                        }
164                }
165                onRejectedExecution(command);
166        }
167        
168        protected void handleException(Runnable task, Exception e) {
169                LOGGER.log(Level.SEVERE, "Exception in task "+task+": "+e, e);
170        }
171
172        /**
173         * This method is invoked when all threads are busy and cannot accept command for
174         * immediate execution. This implementation executes the command in the caller thread.
175         * @param command
176         */
177        protected void onRejectedExecution(Runnable command) {
178                command.run();
179        }
180        
181        private static final Logger LOGGER = Logger.getLogger(NonQueueingThreadPoolExecutorService.class.getName());
182
183        protected void handleInterruptedException(InterruptedException e) {
184                LOGGER.log(Level.SEVERE, "Interrupted exception: "+e, e);
185        }
186        
187    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
188        return new NonBlockingFutureTask<T>(runnable, value);
189    }
190
191    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
192        return new NonBlockingFutureTask<T>(callable);
193    }
194                
195}