001    package com.hammurapi.reasoning.impl;
002    
003    import java.util.Collections;
004    import java.util.List;
005    import java.util.concurrent.Callable;
006    import java.util.concurrent.Future;
007    import java.util.concurrent.FutureTask;
008    
009    import com.hammurapi.config.bootstrap.ConfigurationException;
010    import com.hammurapi.config.bootstrap.Destroyable;
011    import com.hammurapi.flow.runtime.Invocable;
012    import com.hammurapi.flow.runtime.Invoker;
013    import com.hammurapi.flow.runtime.ProcessingPathElement;
014    import com.hammurapi.flow.runtime.impl.ExecutorSynapseFactory;
015    import com.hammurapi.reasoning.ExceptionHandler;
016    import com.hammurapi.util.concurrent.TaskCounter;
017    
018    /**
019     * 
020     * @author Pavel Vlasov
021     *
022     * @param <S> Flow state type
023     * @param <A> Argument type
024     */
025    public class PriorityExecutorSynapseFactory<S,A> extends ExecutorSynapseFactory<Integer, S, A> {
026            
027            @Override
028            public Destroyable connect(
029                            Invoker<S,A> invoker, 
030                            Invocable<S,A> invocable,
031                            com.hammurapi.flow.runtime.SynapseFactory.SynapseConfig config,
032                            Integer connectKey, 
033                            ProcessingPathElement processingPathElement,
034                            TaskCounter taskCounter) throws ConfigurationException {
035                    
036                    if (connectKey==null) {
037                            throw new NullPointerException("Connect key (priority) must not be null."); 
038                    }
039                    return super.connect(invoker, invocable, config, connectKey, processingPathElement, taskCounter);
040            }
041            
042            @Override
043            protected FutureTask<List<Future<?>>> createTask(
044                            final Callable<List<Future<?>>> callable, 
045                            TaskCounter taskCounter,
046                            Integer connectKey, 
047                            S flowState,
048                            A[] args) throws Exception {
049                    
050                    Callable<List<Future<?>>> exceptionHandlingCallable = new Callable<List<Future<?>>>() {
051    
052                            @Override
053                            public List<Future<?>> call() throws Exception {
054                                    if (exceptionHandler==null) {
055                                            return callable.call();
056                                    }
057                                    try {
058                                            return callable.call();
059                                    } catch (Exception e) {
060                                            exceptionHandler.handleException(e);
061                                            return Collections.emptyList(); // If exception is handled.
062                                    }
063                            }
064                    };
065                    
066                    return new NonBlockingPriorityNotifyingFutureTask<List<Future<?>>, A>(exceptionHandlingCallable, taskCounter, args, connectKey);
067            }
068            
069            private ExceptionHandler exceptionHandler;
070            
071            public void setExceptionHandler(ExceptionHandler exceptionHandler) {
072                    this.exceptionHandler = exceptionHandler;
073            }
074            
075            public ExceptionHandler getExceptionHandler() {
076                    return exceptionHandler;
077            }
078    
079    }