001package com.hammurapi.common.concurrent; 002 003import java.util.Collections; 004import java.util.List; 005import java.util.concurrent.AbstractExecutorService; 006import java.util.concurrent.Callable; 007import java.util.concurrent.FutureTask; 008import java.util.concurrent.RunnableFuture; 009import java.util.concurrent.ThreadFactory; 010import java.util.concurrent.TimeUnit; 011import java.util.concurrent.atomic.AtomicBoolean; 012import java.util.logging.Level; 013import java.util.logging.Logger; 014 015/** 016 * This class does not queue jobs - it either executes them immediately in a background thread 017 * or in a caller thread if all background threads are busy. 018 * @author Pavel Vlasov 019 * 020 */ 021public class NonQueueingThreadPoolExecutorService extends AbstractExecutorService { 022 023 private AtomicBoolean shutdown = new AtomicBoolean(); 024 025 private class NonQueueingRunnable implements Runnable { 026 027 private int idx; 028 029 public NonQueueingRunnable(int idx) { 030 this.idx = idx; 031 } 032 033 Runnable[] toRun = {null}; 034 035 @Override 036 public void run() { 037 try { 038 while (true) { 039 synchronized (toRun) { 040 while (toRun[0]==null) { 041 if (shutdown.get()) { 042 toRun.notifyAll(); 043 return; 044 } 045 try { 046 toRun.wait(); 047 } catch (InterruptedException e) { 048 handleInterruptedException(e); 049 } 050 } 051 } 052 try { 053 toRun[0].run(); 054 } catch (Exception e) { 055 handleException(toRun[0], e); 056 } 057 synchronized (toRun) { 058 toRun[0] = null; 059 } 060 } 061 } finally { 062 runnables[idx] = null; 063 } 064 } 065 066 boolean offer(Runnable command) { 067 synchronized (toRun) { 068 if (this.toRun[0]==null) { 069 this.toRun[0]=command; 070 this.toRun.notifyAll(); 071 return true; 072 } 073 return false; 074 } 075 } 076 077 void shutdown() { 078 synchronized (toRun) { 079 toRun.notifyAll(); 080 } 081 } 082 083 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 084 synchronized (toRun) { 085 if (toRun[0]!=null) { 086 if (timeout==0) { 087 toRun.wait(); 088 } else { 089 toRun.wait(unit.toMillis(timeout)); 090 } 091 if (toRun[0]!=null) { 092 return false; 093 } 094 } 095 } 096 return true; 097 } 098 } 099 100 private NonQueueingRunnable[] runnables; 101 102 public NonQueueingThreadPoolExecutorService(ThreadFactory threadFactory, int size) { 103 runnables = new NonQueueingRunnable[size]; 104 for (int i=0; i<runnables.length; ++i) { 105 runnables[i] = new NonQueueingRunnable(i); 106 Thread thread = threadFactory.newThread(runnables[i]); 107 if (thread == null) { 108 runnables[i] = null; 109 } else { 110 thread.start(); 111 } 112 } 113 } 114 115 @Override 116 public void shutdown() { 117 shutdown.set(true); 118 for (NonQueueingRunnable nqr: runnables) { 119 if (nqr!=null) { 120 nqr.shutdown(); 121 } 122 } 123 } 124 125 @Override 126 public List<Runnable> shutdownNow() { 127 shutdown(); 128 return Collections.emptyList(); 129 } 130 131 @Override 132 public boolean isShutdown() { 133 return shutdown.get(); 134 } 135 136 @Override 137 public boolean isTerminated() { 138 for (NonQueueingRunnable nqr: runnables) { 139 synchronized (nqr.toRun) { 140 if (nqr.toRun[0]!=null) { 141 return false; 142 } 143 } 144 } 145 return false; 146 } 147 148 @Override 149 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 150 for (NonQueueingRunnable nqr: runnables) { 151 if (nqr!=null && !nqr.awaitTermination(timeout, unit)) { 152 return false; 153 } 154 } 155 return true; 156 } 157 158 @Override 159 public void execute(Runnable command) { 160 for (NonQueueingRunnable nqr: runnables) { 161 if (nqr.offer(command)) { 162 return; 163 } 164 } 165 onRejectedExecution(command); 166 } 167 168 protected void handleException(Runnable task, Exception e) { 169 LOGGER.log(Level.SEVERE, "Exception in task "+task+": "+e, e); 170 } 171 172 /** 173 * This method is invoked when all threads are busy and cannot accept command for 174 * immediate execution. This implementation executes the command in the caller thread. 175 * @param command 176 */ 177 protected void onRejectedExecution(Runnable command) { 178 command.run(); 179 } 180 181 private static final Logger LOGGER = Logger.getLogger(NonQueueingThreadPoolExecutorService.class.getName()); 182 183 protected void handleInterruptedException(InterruptedException e) { 184 LOGGER.log(Level.SEVERE, "Interrupted exception: "+e, e); 185 } 186 187 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { 188 return new NonBlockingFutureTask<T>(runnable, value); 189 } 190 191 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 192 return new NonBlockingFutureTask<T>(callable); 193 } 194 195}