001 package com.hammurapi.reasoning.impl;
002
003 import java.util.Collections;
004 import java.util.List;
005 import java.util.concurrent.Callable;
006 import java.util.concurrent.Future;
007 import java.util.concurrent.FutureTask;
008
009 import com.hammurapi.config.bootstrap.ConfigurationException;
010 import com.hammurapi.config.bootstrap.Destroyable;
011 import com.hammurapi.flow.runtime.Invocable;
012 import com.hammurapi.flow.runtime.Invoker;
013 import com.hammurapi.flow.runtime.ProcessingPathElement;
014 import com.hammurapi.flow.runtime.impl.ExecutorSynapseFactory;
015 import com.hammurapi.reasoning.ExceptionHandler;
016 import com.hammurapi.util.concurrent.TaskCounter;
017
018 /**
019 *
020 * @author Pavel Vlasov
021 *
022 * @param <S> Flow state type
023 * @param <A> Argument type
024 */
025 public class PriorityExecutorSynapseFactory<S,A> extends ExecutorSynapseFactory<Integer, S, A> {
026
027 @Override
028 public Destroyable connect(
029 Invoker<S,A> invoker,
030 Invocable<S,A> invocable,
031 com.hammurapi.flow.runtime.SynapseFactory.SynapseConfig config,
032 Integer connectKey,
033 ProcessingPathElement processingPathElement,
034 TaskCounter taskCounter) throws ConfigurationException {
035
036 if (connectKey==null) {
037 throw new NullPointerException("Connect key (priority) must not be null.");
038 }
039 return super.connect(invoker, invocable, config, connectKey, processingPathElement, taskCounter);
040 }
041
042 @Override
043 protected FutureTask<List<Future<?>>> createTask(
044 final Callable<List<Future<?>>> callable,
045 TaskCounter taskCounter,
046 Integer connectKey,
047 S flowState,
048 A[] args) throws Exception {
049
050 Callable<List<Future<?>>> exceptionHandlingCallable = new Callable<List<Future<?>>>() {
051
052 @Override
053 public List<Future<?>> call() throws Exception {
054 if (exceptionHandler==null) {
055 return callable.call();
056 }
057 try {
058 return callable.call();
059 } catch (Exception e) {
060 exceptionHandler.handleException(e);
061 return Collections.emptyList(); // If exception is handled.
062 }
063 }
064 };
065
066 return new NonBlockingPriorityNotifyingFutureTask<List<Future<?>>, A>(exceptionHandlingCallable, taskCounter, args, connectKey);
067 }
068
069 private ExceptionHandler exceptionHandler;
070
071 public void setExceptionHandler(ExceptionHandler exceptionHandler) {
072 this.exceptionHandler = exceptionHandler;
073 }
074
075 public ExceptionHandler getExceptionHandler() {
076 return exceptionHandler;
077 }
078
079 }