001package com.hammurapi.common.concurrent.transactions;
002
003import java.util.ArrayList;
004import java.util.Collection;
005import java.util.Iterator;
006import java.util.List;
007import java.util.Map;
008import java.util.concurrent.AbstractExecutorService;
009import java.util.concurrent.Callable;
010import java.util.concurrent.ConcurrentHashMap;
011import java.util.concurrent.ExecutorService;
012import java.util.concurrent.Future;
013import java.util.concurrent.TimeUnit;
014import java.util.concurrent.atomic.AtomicInteger;
015import java.util.logging.Level;
016import java.util.logging.Logger;
017
018import javax.transaction.NotSupportedException;
019import javax.transaction.Status;
020import javax.transaction.SystemException;
021import javax.transaction.Transaction;
022import javax.transaction.xa.XAException;
023import javax.transaction.xa.XAResource;
024
025import com.hammurapi.common.Context;
026import com.hammurapi.common.MutableContext;
027import com.hammurapi.common.SimpleMutableContext;
028import com.hammurapi.common.concurrent.CompositeLogger;
029import com.hammurapi.common.concurrent.LocalTrackingExecutorService;
030import com.hammurapi.common.concurrent.LoggerWrapper;
031import com.hammurapi.common.concurrent.PropertySet;
032import com.hammurapi.extract.Predicate;
033
034/**
035 * This class performs concurrent transactional computations. 
036 * @author Pavel Vlasov
037 *
038 * @param <KP>
039 */
040public abstract class AbstractTransactionalProcessor<KP> {
041        
042        /**
043         * Interface to propagate transactions between threads.
044         * @author Pavel Vlasov
045         *
046         */
047        protected interface TransactionPropagation {
048                
049                /**
050                 * This method is invoked in the thread where transaction shall be propagated to.
051                 * @return Propagated transaction.
052                 */
053                Transaction getTransaction();
054                
055                /**
056                 * Disassociates transaction from the caller thread.
057                 */
058                void dispose();
059        }
060        
061        /**
062         * Propagates transaction object AS-IS
063         * @author Pavel Vlasov
064         *
065         */
066        protected class SimpleTransactionPropagation implements TransactionPropagation {
067                
068                private Transaction transaction;
069
070                public SimpleTransactionPropagation(Transaction transaction) {
071                        this.transaction = transaction;
072                }
073
074                @Override
075                public Transaction getTransaction() {
076                        return transaction;
077                }
078
079                @Override
080                public void dispose() {
081                        // NOP                  
082                }
083                
084        }
085        
086        /**
087         * Creates transaction propagation. This implementation creates SimpleTransactionPropagation
088         * @param transaction
089         * @return
090         */
091        protected TransactionPropagation createTransactionPropagation(Transaction transaction) {
092                return new SimpleTransactionPropagation(transaction);
093        }
094        
095        protected ThreadLocal<Transaction> threadTransaction = new ThreadLocal<Transaction>();
096        
097        /**
098         * @return Thread's context transaction.
099         */
100        public Transaction getCurrentTransaction() {
101                return threadTransaction.get();
102        }
103                
104        private class TransactionalRunnable implements Runnable {
105                
106                private TransactionPropagation transactionPropagation;
107                private Runnable master;
108
109                public TransactionalRunnable(Runnable master) {
110                        this.master = master;
111                        this.transactionPropagation = createTransactionPropagation(threadTransaction.get());
112                }
113
114                @Override
115                public void run() {
116                        Transaction transaction = transactionPropagation.getTransaction();
117                        Transaction prevTransaction = threadTransaction.get();
118                        threadTransaction.set(transaction);
119                        try {
120                                master.run();
121                        } catch (RuntimeException e) {
122                                LOGGER.log(Level.SEVERE, "Task "+master+" execution failed, setting transaction to rollback only: "+e, e);
123                                if (transaction!=null) {
124                                        try {
125                                                transaction.setRollbackOnly();
126                                        } catch (SystemException se) {
127                                                LOGGER.log(Level.SEVERE, "Transactional execution failed: "+se, se);
128                                                throw new ProcessingException(se);
129                                        }
130                                }
131                                throw e;
132                        } finally {
133                                threadTransaction.set(prevTransaction);
134                                transactionPropagation.dispose();
135                        }
136                }                                                               
137        }
138        
139        
140        ExecutorService executorService;
141
142        protected abstract PropertySet<KP> createPropertySet(ExecutorService executorService, Context context, PropertySet<KP>... chain);
143        
144        public PropertySet<KP> createPropertySet(Context context, PropertySet<KP>... chain) {
145                return createPropertySet(executorService, context, chain);
146        }
147        
148        static final Logger LOGGER = Logger.getLogger(AbstractTransactionalProcessor.class.getName());
149        
150        static final ProgressMonitor NOP_PROGRESS_MONITOR = new ProgressMonitor() {
151                
152                @Override
153                public void onProgress(int completed, int active, int pending,  String message) {
154                        // NOP
155                        
156                }
157        };
158        
159        private ProgressMonitorFactory progressMonitorFactory;
160        
161        public void setProgressMonitorFactory(ProgressMonitorFactory progressMonitorFactory) {
162                this.progressMonitorFactory = progressMonitorFactory;
163        }
164
165        protected ProgressMonitor createProgressMonitor(String name) {
166                if (progressMonitorFactory==null) {
167                        return NOP_PROGRESS_MONITOR;
168                }
169                return progressMonitorFactory.createProgressMonitor(name);
170        }
171                
172        private class ExecutionContextImpl<V> implements ExecutionContext<KP>, Callable<V> {
173                
174                private Undoer undoer;
175                private Command<KP, V> command;
176                private PropertySet<KP> propertySet;
177                private Object[] args;
178                private ExecutorService executorService;
179                private String resourceIdPrefix;
180                private AtomicInteger resourceCounter;
181                private Map<String, XAResource> resourceMap;
182                private MutableContext context;
183                private com.hammurapi.common.concurrent.Logger logger;
184
185                ExecutionContextImpl(
186                                Undoer undoer, 
187                                Command<KP,V> command, 
188                                Context chain,
189                                PropertySet<KP> propertySet, 
190                                Object[] args,
191                                ExecutorService executorService, 
192                                String resourceIdPrefix, 
193                                AtomicInteger resourceCounter, 
194                                Map<String, XAResource> resourceMap) {
195                        
196                        this.undoer = undoer;
197                        this.command = command;
198                        this.context = AbstractTransactionalProcessor.this.createContext(this, chain);
199                        this.propertySet = propertySet;
200                        this.args = args;
201                        this.executorService = executorService;
202                        context.register(ExecutorService.class, executorService);
203                        this.resourceIdPrefix = resourceIdPrefix;
204                        this.resourceCounter = resourceCounter;
205                        this.resourceMap = resourceMap;
206                        
207                        final Collection<com.hammurapi.common.concurrent.Logger> contextLoggers = new ArrayList<com.hammurapi.common.concurrent.Logger>();
208                        Iterator<com.hammurapi.common.concurrent.Logger> lit = this.context.lookupAll(com.hammurapi.common.concurrent.Logger.class);
209                        while (lit.hasNext()) {
210                                contextLoggers.add(lit.next().getLogger(command.toString()));
211                        }
212                        this.logger = new CompositeLogger(contextLoggers.iterator());
213
214                }       
215                
216                @Override
217                public Transaction getTransaction() {
218                        return getCurrentTransaction();
219                }
220
221                @Override
222                public Object lookup(String name) {
223                        return context.lookup(name);
224                }
225
226                @Override
227                public <T> T lookup(Class<T> serviceClass, Predicate<T, Context>... selectors) {
228                        return context.lookup(serviceClass, selectors);
229                }
230
231                @Override
232                public <T> Iterator<T> lookupAll(Class<T> serviceClass, Predicate<T, Context>... selectors) {
233                        return context.lookupAll(serviceClass, selectors);
234                }
235
236                @Override
237                public void addUndo(Runnable undoTask) {
238                        try {
239                                undoer.addUndoTask(undoTask);
240                        } catch (XAException e) {
241                                getLogger().log(Level.SEVERE, "Failed to add undo task: "+e, e);
242                                try {
243                                        getTransaction().setRollbackOnly();
244                                } catch (IllegalStateException ise) {
245                                getLogger().log(Level.SEVERE, "Sub-transaction rollback failed: "+ise, ise);
246                                } catch (SystemException se) {
247                                getLogger().log(Level.SEVERE, "Sub-transaction rollback failed: "+se, se);
248                                }
249                        }                       
250                }
251
252                @Override
253                public PropertySet<KP> createPropertySet(PropertySet<KP>... chain) {
254                        return AbstractTransactionalProcessor.this.createPropertySet(executorService, this, chain);
255                }
256
257                @Override
258                public V call() throws Exception {
259                        return command.execute(this, propertySet, args);
260                }
261                
262                @Override
263                public <U> Future<U> execute(final Command<KP, U> command, Context context, PropertySet<KP> propertySet, Object... args) {
264                        ExecutionContextImpl<U> ec = new ExecutionContextImpl<U>(
265                                        undoer, 
266                                        command, 
267                                        context, 
268                                        propertySet, 
269                                        args, 
270                                        executorService, 
271                                        resourceIdPrefix, 
272                                        resourceCounter, 
273                                        resourceMap);
274                        return lookup(ExecutorService.class).submit(ec);
275                }
276
277                @Override
278                public void yield() {
279                        throw new UnsupportedOperationException();                      
280                }
281
282                @Override
283                public ProgressMonitor createProgressMonitor(String name) {
284                        return AbstractTransactionalProcessor.this.createProgressMonitor(name);
285                }
286
287                @Override
288                public void registerXAResource(XAResource xaResource) {
289                        String uniqueName = resourceIdPrefix+"_"+Integer.toString(resourceCounter.incrementAndGet(), Character.MAX_RADIX);
290                        if (resourceMap.put(uniqueName, xaResource)!=null) {
291                                throw new ProcessingException("Duplicate resource name: "+uniqueName);
292                        }
293                        AbstractTransactionalProcessor.this.registerXAResource(uniqueName, xaResource);                 
294                }
295
296                @Override
297                public MutableContext createContext() {
298                        return new SimpleMutableContext(this);
299                }
300
301                @Override
302                public com.hammurapi.common.concurrent.Logger getLogger() {                     
303                        return logger;
304                }
305                
306        }               
307                
308        protected AbstractTransactionalProcessor(final ExecutorService executorService) {                       
309                this.executorService = new AbstractExecutorService() {
310                        
311                        @Override
312                        public void execute(Runnable command) {
313                                executorService.execute(new TransactionalRunnable(command));                            
314                        }
315                        
316                        @Override
317                        public List<Runnable> shutdownNow() {
318                                throw new UnsupportedOperationException();
319                        }
320                        
321                        @Override
322                        public void shutdown() {
323                                throw new UnsupportedOperationException();
324                        }
325                        
326                        @Override
327                        public boolean isTerminated() {
328                                return executorService.isTerminated();
329                        }
330                        
331                        @Override
332                        public boolean isShutdown() {
333                                return executorService.isShutdown();
334                        }
335                        
336                        @Override
337                        public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException {
338                                return executorService.awaitTermination(timeout, unit);
339                        }
340                };
341        }
342                
343        private static final AtomicInteger processCounter = new AtomicInteger();
344        
345        private static final AtomicInteger instanceCounter = new AtomicInteger();
346        
347        private final int instanceNo = instanceCounter.incrementAndGet();
348
349        /**
350         * Concrete implementations begin transaction and return transaction instance.
351         * @return
352         * @throws NotSupportedException
353         * @throws SystemException
354         */
355        protected abstract Transaction beginTransaction() throws NotSupportedException, SystemException;
356        
357        protected void registerXAResource(String uniqueId, XAResource xaResource) {
358                
359        }
360        
361        protected void unregisterXAResource(String uniqueId, XAResource xaResource) {
362                
363        }
364
365        /**
366         * Transactionally processes the command.
367         * @param command
368         * @return
369         */
370        public <V> V process(Command<KP, V> command, Context context, PropertySet<KP> propertySet, Object... args) throws Exception {
371                Transaction transaction = beginTransaction();
372                threadTransaction.set(transaction);
373                
374                String resourceIdPrefix = getClass().getName()+"_"+Integer.toString(instanceNo, Character.MAX_RADIX)+"_"+Integer.toString(processCounter.incrementAndGet(), Character.MAX_RADIX);
375
376                Map<String, XAResource> resourceMap = new ConcurrentHashMap<String, XAResource>();
377                
378                try {           
379                        com.hammurapi.common.concurrent.Logger logger;
380                        if (context==null) {
381                                logger = new LoggerWrapper(LOGGER);
382                        } else {
383                                logger = new CompositeLogger(context.lookupAll(com.hammurapi.common.concurrent.Logger.class));
384                        }
385                        Undoer undoer = new Undoer(createProgressMonitor("Undoer"), logger.getLogger("Undoer"));
386                        
387                        String undoerId = resourceIdPrefix+"_Undoer";
388                        
389                        resourceMap.put(undoerId, undoer);
390                        registerXAResource(undoerId, undoer);
391                        
392                        transaction.enlistResource(undoer);
393                                                
394                        AtomicInteger resourceCounter = new AtomicInteger();            
395                        
396                        LocalTrackingExecutorService ltes = new LocalTrackingExecutorService(executorService, false, resourceIdPrefix+"_TrackingExecutorService");
397                        
398                        ExecutionContextImpl<V> ec = new ExecutionContextImpl<V>(
399                                        undoer, 
400                                        command, 
401                                        context, 
402                                        propertySet, 
403                                        args, 
404                                        ltes, 
405                                        resourceIdPrefix, 
406                                        resourceCounter, 
407                                        resourceMap);
408                        
409                        Future<V> resultFuture = ltes.submit(ec);
410
411                        ltes.join();
412                        // process pending in the loop                  
413                        
414                        V result = resultFuture.get();
415                        
416                        transaction.commit();
417                        
418                        return result;
419                } catch (Exception e) {
420                        LOGGER.log(Level.SEVERE, "Exception: "+e, e);
421                        if (transaction!=null && transaction.getStatus()==Status.STATUS_ACTIVE) {
422                                try {
423                                        transaction.rollback();
424                                } catch (Exception ex) {
425                                        LOGGER.log(Level.SEVERE, "Cannot rollback transaction: "+e, e);                                 
426                                }
427                        }
428                        throw e;
429                } finally {
430                        for (Map.Entry<String, XAResource> entry: resourceMap.entrySet()) {
431                                unregisterXAResource(entry.getKey(), entry.getValue());
432                        }
433                }                                                                       
434        }
435
436        /**
437         * Creates mutable context for execution context. Subclasses can override this method to inject specific values into the context, 
438         * e.g. hierarchical logger.
439         * @param owner
440         * @param chain
441         * @return
442         */
443        protected MutableContext createContext(ExecutionContext<KP> owner, Context... chain) {
444                return new SimpleMutableContext(chain);
445        }
446
447}