001 package com.hammurapi.reasoning.impl;
002
003 import java.lang.reflect.Method;
004 import java.util.ArrayList;
005 import java.util.Collection;
006 import java.util.List;
007 import java.util.concurrent.Executor;
008 import java.util.concurrent.Future;
009 import java.util.concurrent.ScheduledExecutorService;
010
011 import com.hammurapi.config.bootstrap.ConfigurationException;
012 import com.hammurapi.config.runtime.ConfigurationContext;
013 import com.hammurapi.flow.Flow;
014 import com.hammurapi.flow.runtime.Invocable;
015 import com.hammurapi.flow.runtime.Invoker;
016 import com.hammurapi.flow.runtime.ProcessingPathElement;
017 import com.hammurapi.flow.runtime.PropertySet;
018 import com.hammurapi.flow.runtime.PropertySetFactory;
019 import com.hammurapi.flow.runtime.SynapseFactory;
020 import com.hammurapi.flow.runtime.Transition;
021 import com.hammurapi.flow.runtime.impl.ExecutorSynapseFactory;
022 import com.hammurapi.flow.runtime.impl.MapPropertySetFactory;
023 import com.hammurapi.flow.runtime.impl.TaskCountingFlowBase;
024 import com.hammurapi.reasoning.ExceptionHandler;
025 import com.hammurapi.reasoning.ReasoningException;
026 import com.hammurapi.util.Context;
027
028 /**
029 * Flow which implements rule set.
030 * @author Pavel Vlasov
031 *
032 */
033 public class RuleSetFlow<F> extends TaskCountingFlowBase<ReasoningNodeBase<F>, Transition<KnowledgeBase<F>, InferenceToken<F>>, List<?>, Integer, KnowledgeBase<F>, InferenceToken<F>> {
034
035 private List<?> rules;
036 private KnowledgeBase<F> knowledgeBase;
037 private Executor executor;
038 private ScheduledExecutorService scheduler;
039 private PropertySetFactory propertySetFactory;
040
041 private List<Exception> collectedExceptions = new ArrayList<Exception>();
042
043 private ExceptionHandler exceptionHandler = new ExceptionHandler() {
044
045 @Override
046 public void handleException(Exception e) throws Exception {
047 synchronized (collectedExceptions) {
048 collectedExceptions.add(e);
049 }
050 }
051 };
052
053 public void checkCollectedExceptions() throws RuleExecutionException {
054 synchronized (collectedExceptions) {
055 if (!collectedExceptions.isEmpty()) {
056 try {
057 throw new RuleExecutionException(collectedExceptions);
058 } finally {
059 collectedExceptions.clear();
060 }
061 }
062 }
063 }
064
065 public KnowledgeBase<F> getKnowledgeBase() {
066 return knowledgeBase;
067 }
068
069 public void setKnowledgeBase(KnowledgeBase<F> knowledgeBase) {
070 this.knowledgeBase = knowledgeBase;
071 }
072
073 public void setExecutor(Executor executor) {
074 this.executor = executor;
075 }
076
077 public Executor getExecutor() {
078 return executor;
079 }
080
081 public void setPropertySetFactory(PropertySetFactory propertySetFactory) {
082 this.propertySetFactory = propertySetFactory;
083 }
084
085 public PropertySetFactory getPropertySetFactory() {
086 return propertySetFactory;
087 }
088
089 public void setScheduler(ScheduledExecutorService scheduler) {
090 this.scheduler = scheduler;
091 }
092
093 public ScheduledExecutorService getScheduler() {
094 return scheduler;
095 }
096
097 @Override
098 public void init(ConfigurationContext<Flow> context) throws ConfigurationException {
099 if (executor==null) {
100 executor = context.lookup(Executor.class);
101 }
102
103 if (scheduler==null) {
104 scheduler = context.lookup(ScheduledExecutorService.class);
105 }
106
107 if (knowledgeBase==null) {
108 Flow def = context.getDefinition();
109 knowledgeBase = new InMemoryKnowledgeBase<F>(def==null ? "Knowledge base" : def.getName(), def==null ? null : def.getDescription(), false);
110 ((InMemoryKnowledgeBase<F>) knowledgeBase).setExecutor(executor);
111 }
112
113 if (propertySetFactory==null) {
114 propertySetFactory = context.lookup(PropertySetFactory.class);
115 if (propertySetFactory==null) {
116 propertySetFactory = new MapPropertySetFactory(executor, scheduler);
117 }
118 }
119
120 if (getSynapseFactory()==null) {
121 setSynapseFactory(context.lookup(SynapseFactory.class));
122 }
123
124 if (getSynapseFactory()==null && executor!=null) {
125 PriorityExecutorSynapseFactory<KnowledgeBase<F>, InferenceToken<F>> pesf = new PriorityExecutorSynapseFactory<KnowledgeBase<F>, InferenceToken<F>>();
126 setSynapseFactory(pesf);
127 pesf.setExecutor(executor);
128 pesf.setScheduler(scheduler);
129 pesf.setPropertySetFactory(propertySetFactory);
130 } else if (getSynapseFactory() instanceof ExecutorSynapseFactory) {
131 ExecutorSynapseFactory<Integer, KnowledgeBase<F>, InferenceToken<F>> esf = (ExecutorSynapseFactory<Integer, KnowledgeBase<F>, InferenceToken<F>>) getSynapseFactory();
132 esf.setExecutor(executor);
133 esf.setScheduler(scheduler);
134 esf.setPropertySetFactory(propertySetFactory);
135 }
136
137 if (getSynapseFactory() instanceof PriorityExecutorSynapseFactory) {
138 ((PriorityExecutorSynapseFactory<KnowledgeBase<F>, InferenceToken<F>>) getSynapseFactory()).setExceptionHandler(exceptionHandler);
139 }
140
141
142 super.init(context);
143
144 if (transitions!=null) {
145 for (Transition<KnowledgeBase<F>, InferenceToken<F>> t:transitions) {
146 if (t instanceof ReasoningTransitionBase) {
147 ((ReasoningTransitionBase) t).setKnowledgeBase(knowledgeBase);
148 }
149 }
150 }
151 if (nodes!=null) {
152 for (ReasoningNodeBase<F> n:nodes) {
153 n.setKnowledgeBase(knowledgeBase);
154 }
155 }
156 }
157
158 public void setRules(List<?> rules) throws ConfigurationException {
159 this.rules = rules;
160 if (transitions!=null) {
161 for (Transition<KnowledgeBase<F>,InferenceToken<F>> t:transitions) {
162 if (t instanceof ReasoningTransitionBase) {
163 ((ReasoningTransitionBase) t).setRules(rules);
164 }
165 }
166 }
167 if (nodes!=null) {
168 for (ReasoningNodeBase<F> n:nodes) {
169 n.setRules(rules);
170 }
171 }
172 }
173
174 @Override
175 public void setTransitions(List<Transition<KnowledgeBase<F>,InferenceToken<F>>> transitions) throws ConfigurationException {
176 super.setTransitions(transitions);
177 for (Transition<KnowledgeBase<F>,InferenceToken<F>> t:transitions) {
178 if (t instanceof ReasoningTransitionBase) {
179 if (rules!=null) {
180 ((ReasoningTransitionBase) t).setRules(rules);
181 }
182 if (knowledgeBase!=null) {
183 ((ReasoningTransitionBase) t).setKnowledgeBase(knowledgeBase);
184 }
185 }
186 }
187 }
188
189 @Override
190 public void setNodes(List<ReasoningNodeBase<F>> nodes) throws ConfigurationException {
191 super.setNodes(nodes);
192 for (ReasoningNodeBase<F> n:nodes) {
193 if (rules!=null) {
194 n.setRules(rules);
195 }
196 if (knowledgeBase!=null) {
197 n.setKnowledgeBase(knowledgeBase);
198 }
199 }
200 }
201
202 @Override
203 protected Invocable<KnowledgeBase<F>,InferenceToken<F>> getInternalInvocable(String pinName, Integer connectKey) {
204 throw new UnsupportedOperationException("RuleSet flow doesn't have outputs");
205 }
206
207 @Override
208 protected Invoker<KnowledgeBase<F>,InferenceToken<F>> getInternalInvoker(String pinName, Integer connectKey) {
209 if (pinName.equals(Constants.PUT)) {
210 return new Invoker<KnowledgeBase<F>,InferenceToken<F>>() {
211
212 @Override
213 public void setInvocable(Invocable<KnowledgeBase<F>,InferenceToken<F>> invocable) throws IllegalStateException {
214 synchronized (addObjectInvocables) {
215 addObjectInvocables.add(invocable);
216 }
217 }
218
219 };
220 }
221 throw new IllegalArgumentException("Invalid pin name: "+pinName);
222 }
223
224 @Override
225 public void setBackEnd(Object object) {
226 throw new UnsupportedOperationException("RuleSet flow doesn't support back-end object.");
227 }
228
229 private Collection<Invocable<KnowledgeBase<F>,InferenceToken<F>>> addObjectInvocables = new ArrayList<Invocable<KnowledgeBase<F>,InferenceToken<F>>>();
230
231 @Override
232 public Invocable<KnowledgeBase<F>,InferenceToken<F>> getInvocable(String pinName, Integer connectKey) throws ConfigurationException {
233 if (pinName.equals(Constants.PUT)) {
234 return new Invocable<KnowledgeBase<F>,InferenceToken<F>>() {
235
236 @Override
237 public Collection<Future<?>> invoke(
238 KnowledgeBase<F> flowState,
239 InferenceToken<F>[] args,
240 PropertySet properties,
241 Context context,
242 List<ProcessingPathElement> processingPath) throws Exception {
243 Collection<Future<?>> ret = new ArrayList<Future<?>>();
244
245 for (Invocable<KnowledgeBase<F>,InferenceToken<F>> i: addObjectInvocables) {
246 ret.addAll(i.invoke(flowState, args, properties, context, processingPath));
247 }
248 return ret;
249 }
250
251 };
252 }
253 throw new IllegalArgumentException("Invalid pin name: "+pinName);
254 }
255
256 @Override
257 public Invoker<KnowledgeBase<F>,InferenceToken<F>> getInvoker(String pinName, Integer connectKey) throws ConfigurationException {
258 throw new UnsupportedOperationException("RuleSet flow doesn't have outputs.");
259 }
260
261 @Override
262 protected Object convertResult(Object proxy, Method method, Object[] args, Collection<Future<?>> invocationResult) throws Exception {
263 return invocationResult.iterator().next().get();
264 }
265
266 private List<PinEntry<F>> pins = new ArrayList<PinEntry<F>>();
267
268 @Override
269 public void addPin(String pinName, List<?> pinConfig) {
270 pins.add(new PinEntry<F>(pinName, pinConfig));
271 }
272
273 private String name;
274 private String description;
275
276 public String getDescription() {
277 return description;
278 }
279
280 public String getName() {
281 return name;
282 }
283
284 public void setName(String name) {
285 this.name = name;
286 }
287
288 public void setDescription(String description) {
289 this.description = description;
290 }
291
292 void reset() throws ReasoningException {
293 for (Object rule: rules) {
294 if (rule instanceof com.hammurapi.reasoning.spi.Rule) {
295 ((com.hammurapi.reasoning.spi.Rule) rule).reset();
296 }
297 }
298 }
299
300 void release() throws ReasoningException {
301 for (Object rule: rules) {
302 if (rule instanceof com.hammurapi.reasoning.spi.Rule) {
303 ((com.hammurapi.reasoning.spi.Rule) rule).release();
304 }
305 }
306 }
307
308 void initRules() throws ReasoningException {
309 for (Object rule: rules) {
310 if (rule instanceof com.hammurapi.reasoning.spi.Rule) {
311 ((com.hammurapi.reasoning.spi.Rule) rule).init();
312 }
313 }
314 }
315
316 }