001 package com.hammurapi.reasoning.impl;
002
003 import java.lang.reflect.Method;
004 import java.util.ArrayList;
005 import java.util.Collection;
006 import java.util.Collections;
007 import java.util.Iterator;
008 import java.util.List;
009 import java.util.Set;
010 import java.util.concurrent.Future;
011 import java.util.logging.Level;
012 import java.util.logging.Logger;
013
014 import com.hammurapi.config.bootstrap.ConfigurationException;
015 import com.hammurapi.flow.runtime.Invocable;
016 import com.hammurapi.flow.runtime.Invoker;
017 import com.hammurapi.flow.runtime.ProcessingPathElement;
018 import com.hammurapi.flow.runtime.PropertySet;
019 import com.hammurapi.flow.runtime.impl.Joiner;
020 import com.hammurapi.flow.runtime.impl.ThreadInvocation;
021 import com.hammurapi.flow.runtime.impl.Joiner.InputConsumer;
022 import com.hammurapi.util.Context;
023
024 /**
025 * Base class for reasoning nodes
026 * @author Pavel Vlasov
027 *
028 * @param <F> Fact type.
029 */
030 public abstract class ReasoningNodeBase<F> implements com.hammurapi.flow.runtime.Node<List<?>, Integer, KnowledgeBase<F>, InferenceToken<F>> {
031 protected final Logger logger = Logger.getLogger(getClass().getName());
032 protected final boolean isFine = logger.isLoggable(Level.FINE);
033
034 private int ruleIndex;
035 protected Object rule;
036 protected KnowledgeBase<F> knowledgeBase;
037
038 protected Method method;
039
040 protected Class<?>[] parameterTypes;
041
042 public void setParameterTypes(List<Class<?>> parameterTypes) {
043 this.parameterTypes = parameterTypes.toArray(new Class[parameterTypes.size()]);
044 }
045
046 @Override
047 public String toString() {
048 StringBuilder sb = new StringBuilder();
049 sb.append("["+getClass().getName()+"]");
050 if (rule!=null) {
051 sb.append(" rule="+rule);
052 }
053 if (method!=null) {
054 sb.append(" method="+method);
055 }
056 return sb.toString();
057 }
058
059 private class JoinContext {
060 PropertySet properties;
061 Context context;
062 List<ProcessingPathElement> processingPath;
063
064 JoinContext(PropertySet properties, Context context, List<ProcessingPathElement> processingPath) {
065 super();
066 this.properties = properties;
067 this.context = context;
068 this.processingPath = processingPath;
069 }
070
071 }
072
073 private Joiner<List<InferenceToken<F>>, JoinContext, Collection<Future<?>>> joiner;
074
075 public void setRuleIndex(int ruleIndex) {
076 this.ruleIndex = ruleIndex;
077 }
078
079 public void setRules(List<?> rules) throws ConfigurationException {
080 this.rule = rules.get(ruleIndex);
081 if (methodName!=null) {
082 try {
083 method = rule.getClass().getMethod(methodName, parameterTypes);
084 } catch (Exception e) {
085 throw new ConfigurationException(e);
086 }
087 }
088 }
089
090
091 public void setKnowledgeBase(KnowledgeBase<F> knowledgeBase) {
092 this.knowledgeBase = knowledgeBase;
093 for (InputCollectorRequest r: inputCollectorsRequests) {
094 r.execute();
095 }
096 inputCollectorsRequests.clear();
097 }
098
099 private List<InputCollectorRequest> inputCollectorsRequests = new ArrayList<InputCollectorRequest>();
100 private ArrayList<PinEntry<F>> inputs;
101
102 protected ArrayList<PinEntry<F>> getInputPins() {
103 return inputs;
104 }
105
106 private class InputCollectorRequest {
107 private int idx;
108 private Set<List<InferenceToken<F>>>[] inputCollectors;
109 String pinName;
110
111 InputCollectorRequest(int idx, String pinName, Set<List<InferenceToken<F>>>[] inputCollectors) {
112 this.idx = idx;
113 this.inputCollectors = inputCollectors;
114 this.pinName = pinName;
115 }
116
117 @SuppressWarnings("unchecked")
118 void execute() {
119 inputCollectors[idx] = (Set<List<InferenceToken<F>>>) knowledgeBase.getSet(pinName, List.class);
120 }
121
122
123 }
124
125 @Override
126 public void beforeConnect() throws ConfigurationException {
127 // NOP
128 }
129
130 @SuppressWarnings("unchecked")
131 @Override
132 public void afterConnect() throws ConfigurationException {
133 // Count input pins, if more than one - create joiner
134 inputs = new ArrayList<PinEntry<F>>();
135 for (PinEntry<F> pin: pins) {
136 if (pin.getName().startsWith(Constants.INPUT)) {
137 inputs.add(pin);
138 } else if (pin.getName().startsWith(Constants.OUTPUT)) {
139 if (pin.outputs!=null) {
140 Collections.sort(pin.outputs);
141 }
142 }
143 }
144
145 if (inputs.isEmpty()) {
146 throw new ConfigurationException("Inference node with no inputs");
147 }
148
149 sortInputs();
150
151 if (inputs.size()>1) {
152 final Set<List<InferenceToken<F>>>[] inputCollectors = new Set[inputs.size()];
153
154 Iterator<PinEntry<F>> it = inputs.iterator();
155 for (int i=0; it.hasNext(); ++i) {
156 PinEntry<F> pin = it.next();
157 pin.index = i;
158 inputCollectorsRequests.add(new InputCollectorRequest(i, pin.getName(), inputCollectors));
159 }
160
161 Class<?> cls = List.class;
162 joiner = new Joiner<List<InferenceToken<F>>, JoinContext, Collection<Future<?>>>(inputCollectors, (Class<List<InferenceToken<F>>>) cls, false) {
163
164 @Override
165 protected boolean isValidInput(int index, List<InferenceToken<F>> input) {
166 for (InferenceToken<F> t: input) {
167 if (t.isConsumed() || ((HandleImpl<F>) t.getHandle()).get()==null) {
168 return false;
169 }
170 }
171 return true;
172 }
173
174 @Override
175 protected Collection<Future<?>> join(List<InferenceToken<F>>[] inputs, JoinContext context, InputConsumer consumer, int activator) throws Exception {
176 return ReasoningNodeBase.this.process(
177 knowledgeBase,
178 inputs,
179 context.properties,
180 context.context,
181 context.processingPath,
182 consumer,
183 activator);
184 }
185
186 @Override
187 protected boolean partialJoin(List<InferenceToken<F>>[] inputs, int index) throws Exception {
188 return ReasoningNodeBase.this.partialJoin(inputs, index);
189 }
190 };
191 }
192
193 }
194
195 /**
196 * This implementation does nothing.
197 * Condition nodes can override this method to sort input pins to optimize inference by
198 * moving pins participating in condition to first places and evaluating conditions
199 * as soon as possible in partialJoin() method.
200 * @param inputPins Input pins to sort.
201 */
202 protected void sortInputs() {
203 // NOP
204 }
205
206 /**
207 * Validates already joined inputs.
208 * @param inputs Inputs
209 * @param index Index of last already joined input.
210 * @return
211 */
212 protected boolean partialJoin(List<InferenceToken<F>>[] inputs, int index) throws Exception {
213 return true;
214 }
215
216 protected List<PinEntry<F>> pins = new ArrayList<PinEntry<F>>();
217
218 @Override
219 public void addPin(String pinName, List<?> pinConfig) {
220 pins.add(new PinEntry<F>(pinName, pinConfig));
221 }
222
223 @Override
224 public Invocable<KnowledgeBase<F>, InferenceToken<F>> getInvocable(String pinName, Integer connectKey) throws ConfigurationException {
225 for (PinEntry<F> pin: pins) {
226 if (pinName.equals(pin.getName())) {
227 if (pin.invocable==null) {
228 final PinEntry<F> pe = pin;
229 pin.invocable = new Invocable<KnowledgeBase<F>,InferenceToken<F>>() {
230
231 @Override
232 public Collection<Future<?>> invoke(
233 KnowledgeBase<F> flowState,
234 InferenceToken<F>[] args,
235 PropertySet properties,
236 Context context,
237 List<ProcessingPathElement> processingPath) throws Exception {
238 return ReasoningNodeBase.this.invoke(flowState, args, pe, properties, context, processingPath);
239 }
240
241 };
242 }
243 return pin.invocable;
244 }
245 }
246
247 throw new ConfigurationException("Unknown pin name: "+pinName);
248 }
249
250 /**
251 * Ivocations come here. Depending on whether joiner is null or not, do direct call to join or through
252 * joiner.
253 * @param pe
254 * @param properties
255 * @param context
256 * @param processingPath
257 * @param args
258 * @throws Exception
259 */
260 @SuppressWarnings("unchecked")
261 protected Collection<Future<?>> invoke(
262 KnowledgeBase<F> flowState,
263 InferenceToken<F>[] args,
264 PinEntry<F> pe,
265 PropertySet properties,
266 Context context,
267 List<ProcessingPathElement> processingPath) throws Exception {
268
269 ThreadInvocation.setThreadInvocation(flowState, args, properties, context, processingPath);
270 try {
271 List<InferenceToken<F>> argsList = new ArrayList<InferenceToken<F>>();
272 for (InferenceToken<F> arg: args) {
273 argsList.add(arg);
274 }
275 if (joiner==null) { // Direct call
276 List<InferenceToken<F>>[] argsListArray = new ArrayList[1];
277 argsListArray[0] = argsList;
278 if (!partialJoin(argsListArray, 0)) {
279 return Collections.emptyList();
280 }
281 return process(flowState, argsListArray, properties, context, processingPath, null, 0);
282 } else { // Join
283 if (isFine) {
284 StringBuilder sb = new StringBuilder("New input at index "+pe.index+" for rule "+rule+", method "+method);
285 for (InferenceToken<F> it: args) {
286 sb.append(Constants.LINE_SEPARATOR+"\t"+knowledgeBase.get(it.getHandle()));
287 }
288 logger.fine(sb.toString());
289 }
290 Collection<Future<?>> ret = joiner.addInput(pe.index, argsList, new JoinContext(properties, context, processingPath));
291 if (ret==null) {
292 return Collections.emptyList();
293 }
294 return ret;
295 }
296 } finally {
297 ThreadInvocation.unsetThreadInvocation();
298 }
299
300 }
301
302 /**
303 * Processes inputs.
304 * @param properties Properties.
305 * @param context Context.
306 * @param processingPath Processing path.
307 * @param inputs Inputs.
308 * @param consumer Input consumer.
309 * @return Collection of future results.
310 * @throws Exception If anything goes wrong.
311 */
312 protected abstract Collection<Future<?>> process(
313 KnowledgeBase<F> flowState,
314 List<InferenceToken<F>>[] inputs,
315 PropertySet properties,
316 Context context,
317 List<ProcessingPathElement> processingPath,
318 InputConsumer consumer,
319 int activator) throws Exception;
320
321 @Override
322 public Invoker<KnowledgeBase<F>, InferenceToken<F>> getInvoker(String pinName, Integer connectKey) throws ConfigurationException {
323 for (PinEntry<F> pin: pins) {
324 if (pinName.equals(pin.getName())) {
325 Output<F> output = new Output<F>(pin, connectKey);
326 pin.outputs.add(output);
327 return output;
328 }
329 }
330
331 throw new ConfigurationException("Unknown pin name: "+pinName);
332 }
333
334 protected String methodName;
335
336 public void setMethodName(String methodName) {
337 this.methodName = methodName;
338 }
339
340 private int priority;
341
342 public int getPriority() {
343 return priority;
344 }
345
346 public void setPriority(int priority) {
347 this.priority = priority;
348 }
349
350 protected String ruleName;
351 protected String ruleDescription;
352
353 public void setRuleName(String ruleName) {
354 this.ruleName = ruleName;
355 }
356
357 public void setRuleDescription(String ruleDescription) {
358 this.ruleDescription = ruleDescription;
359 }
360
361 }