001package com.hammurapi.common.concurrent.transactions; 002 003import java.util.ArrayList; 004import java.util.Collection; 005import java.util.Iterator; 006import java.util.List; 007import java.util.Map; 008import java.util.concurrent.AbstractExecutorService; 009import java.util.concurrent.Callable; 010import java.util.concurrent.ConcurrentHashMap; 011import java.util.concurrent.ExecutorService; 012import java.util.concurrent.Future; 013import java.util.concurrent.TimeUnit; 014import java.util.concurrent.atomic.AtomicInteger; 015import java.util.logging.Level; 016import java.util.logging.Logger; 017 018import javax.transaction.NotSupportedException; 019import javax.transaction.Status; 020import javax.transaction.SystemException; 021import javax.transaction.Transaction; 022import javax.transaction.xa.XAException; 023import javax.transaction.xa.XAResource; 024 025import com.hammurapi.common.Context; 026import com.hammurapi.common.MutableContext; 027import com.hammurapi.common.SimpleMutableContext; 028import com.hammurapi.common.concurrent.CompositeLogger; 029import com.hammurapi.common.concurrent.LocalTrackingExecutorService; 030import com.hammurapi.common.concurrent.LoggerWrapper; 031import com.hammurapi.common.concurrent.PropertySet; 032import com.hammurapi.extract.Predicate; 033 034/** 035 * This class performs concurrent transactional computations. 036 * @author Pavel Vlasov 037 * 038 * @param <KP> 039 */ 040public abstract class AbstractTransactionalProcessor<KP> { 041 042 /** 043 * Interface to propagate transactions between threads. 044 * @author Pavel Vlasov 045 * 046 */ 047 protected interface TransactionPropagation { 048 049 /** 050 * This method is invoked in the thread where transaction shall be propagated to. 051 * @return Propagated transaction. 052 */ 053 Transaction getTransaction(); 054 055 /** 056 * Disassociates transaction from the caller thread. 057 */ 058 void dispose(); 059 } 060 061 /** 062 * Propagates transaction object AS-IS 063 * @author Pavel Vlasov 064 * 065 */ 066 protected class SimpleTransactionPropagation implements TransactionPropagation { 067 068 private Transaction transaction; 069 070 public SimpleTransactionPropagation(Transaction transaction) { 071 this.transaction = transaction; 072 } 073 074 @Override 075 public Transaction getTransaction() { 076 return transaction; 077 } 078 079 @Override 080 public void dispose() { 081 // NOP 082 } 083 084 } 085 086 /** 087 * Creates transaction propagation. This implementation creates SimpleTransactionPropagation 088 * @param transaction 089 * @return 090 */ 091 protected TransactionPropagation createTransactionPropagation(Transaction transaction) { 092 return new SimpleTransactionPropagation(transaction); 093 } 094 095 protected ThreadLocal<Transaction> threadTransaction = new ThreadLocal<Transaction>(); 096 097 /** 098 * @return Thread's context transaction. 099 */ 100 public Transaction getCurrentTransaction() { 101 return threadTransaction.get(); 102 } 103 104 private class TransactionalRunnable implements Runnable { 105 106 private TransactionPropagation transactionPropagation; 107 private Runnable master; 108 109 public TransactionalRunnable(Runnable master) { 110 this.master = master; 111 this.transactionPropagation = createTransactionPropagation(threadTransaction.get()); 112 } 113 114 @Override 115 public void run() { 116 Transaction transaction = transactionPropagation.getTransaction(); 117 Transaction prevTransaction = threadTransaction.get(); 118 threadTransaction.set(transaction); 119 try { 120 master.run(); 121 } catch (RuntimeException e) { 122 LOGGER.log(Level.SEVERE, "Task "+master+" execution failed, setting transaction to rollback only: "+e, e); 123 if (transaction!=null) { 124 try { 125 transaction.setRollbackOnly(); 126 } catch (SystemException se) { 127 LOGGER.log(Level.SEVERE, "Transactional execution failed: "+se, se); 128 throw new ProcessingException(se); 129 } 130 } 131 throw e; 132 } finally { 133 threadTransaction.set(prevTransaction); 134 transactionPropagation.dispose(); 135 } 136 } 137 } 138 139 140 ExecutorService executorService; 141 142 protected abstract PropertySet<KP> createPropertySet(ExecutorService executorService, Context context, PropertySet<KP>... chain); 143 144 public PropertySet<KP> createPropertySet(Context context, PropertySet<KP>... chain) { 145 return createPropertySet(executorService, context, chain); 146 } 147 148 static final Logger LOGGER = Logger.getLogger(AbstractTransactionalProcessor.class.getName()); 149 150 static final ProgressMonitor NOP_PROGRESS_MONITOR = new ProgressMonitor() { 151 152 @Override 153 public void onProgress(int completed, int active, int pending, String message) { 154 // NOP 155 156 } 157 }; 158 159 private ProgressMonitorFactory progressMonitorFactory; 160 161 public void setProgressMonitorFactory(ProgressMonitorFactory progressMonitorFactory) { 162 this.progressMonitorFactory = progressMonitorFactory; 163 } 164 165 protected ProgressMonitor createProgressMonitor(String name) { 166 if (progressMonitorFactory==null) { 167 return NOP_PROGRESS_MONITOR; 168 } 169 return progressMonitorFactory.createProgressMonitor(name); 170 } 171 172 private class ExecutionContextImpl<V> implements ExecutionContext<KP>, Callable<V> { 173 174 private Undoer undoer; 175 private Command<KP, V> command; 176 private PropertySet<KP> propertySet; 177 private Object[] args; 178 private ExecutorService executorService; 179 private String resourceIdPrefix; 180 private AtomicInteger resourceCounter; 181 private Map<String, XAResource> resourceMap; 182 private MutableContext context; 183 private com.hammurapi.common.concurrent.Logger logger; 184 185 ExecutionContextImpl( 186 Undoer undoer, 187 Command<KP,V> command, 188 Context chain, 189 PropertySet<KP> propertySet, 190 Object[] args, 191 ExecutorService executorService, 192 String resourceIdPrefix, 193 AtomicInteger resourceCounter, 194 Map<String, XAResource> resourceMap) { 195 196 this.undoer = undoer; 197 this.command = command; 198 this.context = AbstractTransactionalProcessor.this.createContext(this, chain); 199 this.propertySet = propertySet; 200 this.args = args; 201 this.executorService = executorService; 202 context.register(ExecutorService.class, executorService); 203 this.resourceIdPrefix = resourceIdPrefix; 204 this.resourceCounter = resourceCounter; 205 this.resourceMap = resourceMap; 206 207 final Collection<com.hammurapi.common.concurrent.Logger> contextLoggers = new ArrayList<com.hammurapi.common.concurrent.Logger>(); 208 Iterator<com.hammurapi.common.concurrent.Logger> lit = this.context.lookupAll(com.hammurapi.common.concurrent.Logger.class); 209 while (lit.hasNext()) { 210 contextLoggers.add(lit.next().getLogger(command.toString())); 211 } 212 this.logger = new CompositeLogger(contextLoggers.iterator()); 213 214 } 215 216 @Override 217 public Transaction getTransaction() { 218 return getCurrentTransaction(); 219 } 220 221 @Override 222 public Object lookup(String name) { 223 return context.lookup(name); 224 } 225 226 @Override 227 public <T> T lookup(Class<T> serviceClass, Predicate<T, Context>... selectors) { 228 return context.lookup(serviceClass, selectors); 229 } 230 231 @Override 232 public <T> Iterator<T> lookupAll(Class<T> serviceClass, Predicate<T, Context>... selectors) { 233 return context.lookupAll(serviceClass, selectors); 234 } 235 236 @Override 237 public void addUndo(Runnable undoTask) { 238 try { 239 undoer.addUndoTask(undoTask); 240 } catch (XAException e) { 241 getLogger().log(Level.SEVERE, "Failed to add undo task: "+e, e); 242 try { 243 getTransaction().setRollbackOnly(); 244 } catch (IllegalStateException ise) { 245 getLogger().log(Level.SEVERE, "Sub-transaction rollback failed: "+ise, ise); 246 } catch (SystemException se) { 247 getLogger().log(Level.SEVERE, "Sub-transaction rollback failed: "+se, se); 248 } 249 } 250 } 251 252 @Override 253 public PropertySet<KP> createPropertySet(PropertySet<KP>... chain) { 254 return AbstractTransactionalProcessor.this.createPropertySet(executorService, this, chain); 255 } 256 257 @Override 258 public V call() throws Exception { 259 return command.execute(this, propertySet, args); 260 } 261 262 @Override 263 public <U> Future<U> execute(final Command<KP, U> command, Context context, PropertySet<KP> propertySet, Object... args) { 264 ExecutionContextImpl<U> ec = new ExecutionContextImpl<U>( 265 undoer, 266 command, 267 context, 268 propertySet, 269 args, 270 executorService, 271 resourceIdPrefix, 272 resourceCounter, 273 resourceMap); 274 return lookup(ExecutorService.class).submit(ec); 275 } 276 277 @Override 278 public void yield() { 279 throw new UnsupportedOperationException(); 280 } 281 282 @Override 283 public ProgressMonitor createProgressMonitor(String name) { 284 return AbstractTransactionalProcessor.this.createProgressMonitor(name); 285 } 286 287 @Override 288 public void registerXAResource(XAResource xaResource) { 289 String uniqueName = resourceIdPrefix+"_"+Integer.toString(resourceCounter.incrementAndGet(), Character.MAX_RADIX); 290 if (resourceMap.put(uniqueName, xaResource)!=null) { 291 throw new ProcessingException("Duplicate resource name: "+uniqueName); 292 } 293 AbstractTransactionalProcessor.this.registerXAResource(uniqueName, xaResource); 294 } 295 296 @Override 297 public MutableContext createContext() { 298 return new SimpleMutableContext(this); 299 } 300 301 @Override 302 public com.hammurapi.common.concurrent.Logger getLogger() { 303 return logger; 304 } 305 306 } 307 308 protected AbstractTransactionalProcessor(final ExecutorService executorService) { 309 this.executorService = new AbstractExecutorService() { 310 311 @Override 312 public void execute(Runnable command) { 313 executorService.execute(new TransactionalRunnable(command)); 314 } 315 316 @Override 317 public List<Runnable> shutdownNow() { 318 throw new UnsupportedOperationException(); 319 } 320 321 @Override 322 public void shutdown() { 323 throw new UnsupportedOperationException(); 324 } 325 326 @Override 327 public boolean isTerminated() { 328 return executorService.isTerminated(); 329 } 330 331 @Override 332 public boolean isShutdown() { 333 return executorService.isShutdown(); 334 } 335 336 @Override 337 public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { 338 return executorService.awaitTermination(timeout, unit); 339 } 340 }; 341 } 342 343 private static final AtomicInteger processCounter = new AtomicInteger(); 344 345 private static final AtomicInteger instanceCounter = new AtomicInteger(); 346 347 private final int instanceNo = instanceCounter.incrementAndGet(); 348 349 /** 350 * Concrete implementations begin transaction and return transaction instance. 351 * @return 352 * @throws NotSupportedException 353 * @throws SystemException 354 */ 355 protected abstract Transaction beginTransaction() throws NotSupportedException, SystemException; 356 357 protected void registerXAResource(String uniqueId, XAResource xaResource) { 358 359 } 360 361 protected void unregisterXAResource(String uniqueId, XAResource xaResource) { 362 363 } 364 365 /** 366 * Transactionally processes the command. 367 * @param command 368 * @return 369 */ 370 public <V> V process(Command<KP, V> command, Context context, PropertySet<KP> propertySet, Object... args) throws Exception { 371 Transaction transaction = beginTransaction(); 372 threadTransaction.set(transaction); 373 374 String resourceIdPrefix = getClass().getName()+"_"+Integer.toString(instanceNo, Character.MAX_RADIX)+"_"+Integer.toString(processCounter.incrementAndGet(), Character.MAX_RADIX); 375 376 Map<String, XAResource> resourceMap = new ConcurrentHashMap<String, XAResource>(); 377 378 try { 379 com.hammurapi.common.concurrent.Logger logger; 380 if (context==null) { 381 logger = new LoggerWrapper(LOGGER); 382 } else { 383 logger = new CompositeLogger(context.lookupAll(com.hammurapi.common.concurrent.Logger.class)); 384 } 385 Undoer undoer = new Undoer(createProgressMonitor("Undoer"), logger.getLogger("Undoer")); 386 387 String undoerId = resourceIdPrefix+"_Undoer"; 388 389 resourceMap.put(undoerId, undoer); 390 registerXAResource(undoerId, undoer); 391 392 transaction.enlistResource(undoer); 393 394 AtomicInteger resourceCounter = new AtomicInteger(); 395 396 LocalTrackingExecutorService ltes = new LocalTrackingExecutorService(executorService, false, resourceIdPrefix+"_TrackingExecutorService"); 397 398 ExecutionContextImpl<V> ec = new ExecutionContextImpl<V>( 399 undoer, 400 command, 401 context, 402 propertySet, 403 args, 404 ltes, 405 resourceIdPrefix, 406 resourceCounter, 407 resourceMap); 408 409 Future<V> resultFuture = ltes.submit(ec); 410 411 ltes.join(); 412 // process pending in the loop 413 414 V result = resultFuture.get(); 415 416 transaction.commit(); 417 418 return result; 419 } catch (Exception e) { 420 LOGGER.log(Level.SEVERE, "Exception: "+e, e); 421 if (transaction!=null && transaction.getStatus()==Status.STATUS_ACTIVE) { 422 try { 423 transaction.rollback(); 424 } catch (Exception ex) { 425 LOGGER.log(Level.SEVERE, "Cannot rollback transaction: "+e, e); 426 } 427 } 428 throw e; 429 } finally { 430 for (Map.Entry<String, XAResource> entry: resourceMap.entrySet()) { 431 unregisterXAResource(entry.getKey(), entry.getValue()); 432 } 433 } 434 } 435 436 /** 437 * Creates mutable context for execution context. Subclasses can override this method to inject specific values into the context, 438 * e.g. hierarchical logger. 439 * @param owner 440 * @param chain 441 * @return 442 */ 443 protected MutableContext createContext(ExecutionContext<KP> owner, Context... chain) { 444 return new SimpleMutableContext(chain); 445 } 446 447}