001    package com.hammurapi.reasoning.impl;
002    
003    import java.lang.reflect.Method;
004    import java.util.ArrayList;
005    import java.util.Collection;
006    import java.util.Collections;
007    import java.util.Iterator;
008    import java.util.List;
009    import java.util.Set;
010    import java.util.concurrent.Future;
011    import java.util.logging.Level;
012    import java.util.logging.Logger;
013    
014    import com.hammurapi.config.bootstrap.ConfigurationException;
015    import com.hammurapi.flow.runtime.Invocable;
016    import com.hammurapi.flow.runtime.Invoker;
017    import com.hammurapi.flow.runtime.ProcessingPathElement;
018    import com.hammurapi.flow.runtime.PropertySet;
019    import com.hammurapi.flow.runtime.impl.Joiner;
020    import com.hammurapi.flow.runtime.impl.ThreadInvocation;
021    import com.hammurapi.flow.runtime.impl.Joiner.InputConsumer;
022    import com.hammurapi.util.Context;
023    
024    /**
025     * Base class for reasoning nodes
026     * @author Pavel Vlasov
027     *
028     * @param <F> Fact type.
029     */
030    public abstract class ReasoningNodeBase<F> implements com.hammurapi.flow.runtime.Node<List<?>, Integer, KnowledgeBase<F>, InferenceToken<F>> {
031            protected final Logger logger = Logger.getLogger(getClass().getName());
032            protected final boolean isFine = logger.isLoggable(Level.FINE);
033    
034            private int ruleIndex;
035            protected Object rule;
036            protected KnowledgeBase<F> knowledgeBase;
037            
038            protected Method method;
039            
040            protected Class<?>[] parameterTypes;
041    
042            public void setParameterTypes(List<Class<?>> parameterTypes) {
043                    this.parameterTypes = parameterTypes.toArray(new Class[parameterTypes.size()]);
044            }
045                    
046            @Override
047            public String toString() {
048                    StringBuilder sb = new StringBuilder();
049                    sb.append("["+getClass().getName()+"]");
050                    if (rule!=null) {
051                            sb.append(" rule="+rule);
052                    }
053                    if (method!=null) {
054                            sb.append(" method="+method);
055                    }
056                    return sb.toString();
057            }
058            
059            private class JoinContext {
060                    PropertySet properties; 
061                    Context context;
062                    List<ProcessingPathElement> processingPath;
063                    
064                    JoinContext(PropertySet properties, Context context, List<ProcessingPathElement> processingPath) {
065                            super();
066                            this.properties = properties;
067                            this.context = context;
068                            this.processingPath = processingPath;
069                    }
070                    
071            }
072            
073            private Joiner<List<InferenceToken<F>>, JoinContext, Collection<Future<?>>> joiner;
074            
075            public void setRuleIndex(int ruleIndex) {
076                    this.ruleIndex = ruleIndex;
077            }
078    
079            public void setRules(List<?> rules) throws ConfigurationException {
080                    this.rule = rules.get(ruleIndex);
081                    if (methodName!=null) {
082                            try {
083                                    method = rule.getClass().getMethod(methodName, parameterTypes);
084                            } catch (Exception e) {
085                                    throw new ConfigurationException(e);
086                            }
087                    }
088            }
089            
090    
091            public void setKnowledgeBase(KnowledgeBase<F> knowledgeBase) {
092                    this.knowledgeBase = knowledgeBase;     
093                    for (InputCollectorRequest r: inputCollectorsRequests) {
094                            r.execute();
095                    }
096                    inputCollectorsRequests.clear();
097            }
098            
099            private List<InputCollectorRequest> inputCollectorsRequests = new ArrayList<InputCollectorRequest>();
100            private ArrayList<PinEntry<F>> inputs;
101            
102            protected ArrayList<PinEntry<F>> getInputPins() {
103                    return inputs;
104            }
105            
106            private class InputCollectorRequest {
107                    private int idx;
108                    private Set<List<InferenceToken<F>>>[] inputCollectors;
109                    String pinName;
110                    
111                    InputCollectorRequest(int idx, String pinName, Set<List<InferenceToken<F>>>[] inputCollectors) {
112                            this.idx = idx;
113                            this.inputCollectors = inputCollectors;
114                            this.pinName = pinName;
115                    }
116                    
117                    @SuppressWarnings("unchecked")
118                    void execute() {
119                            inputCollectors[idx] = (Set<List<InferenceToken<F>>>) knowledgeBase.getSet(pinName, List.class);                      
120                    }
121                    
122                    
123            }
124            
125            @Override
126            public void beforeConnect() throws ConfigurationException {
127                    // NOP          
128            }
129                    
130            @SuppressWarnings("unchecked")
131            @Override
132            public void afterConnect() throws ConfigurationException {
133                    // Count input pins, if more than one - create joiner
134                    inputs = new ArrayList<PinEntry<F>>();
135                    for (PinEntry<F> pin: pins) {
136                            if (pin.getName().startsWith(Constants.INPUT)) {
137                                    inputs.add(pin);
138                            } else if (pin.getName().startsWith(Constants.OUTPUT)) {
139                                    if (pin.outputs!=null) {
140                                            Collections.sort(pin.outputs);
141                                    }
142                            }
143                    }
144                    
145                    if (inputs.isEmpty()) {
146                            throw new ConfigurationException("Inference node with no inputs");
147                    }
148                    
149                    sortInputs();
150                    
151                    if (inputs.size()>1) {                       
152                            final Set<List<InferenceToken<F>>>[] inputCollectors = new Set[inputs.size()];
153                            
154                            Iterator<PinEntry<F>> it = inputs.iterator();
155                            for (int i=0; it.hasNext(); ++i) {
156                                    PinEntry<F> pin = it.next();
157                                    pin.index = i;
158                                    inputCollectorsRequests.add(new InputCollectorRequest(i, pin.getName(), inputCollectors));
159                            }
160                            
161                            Class<?> cls =  List.class; 
162                            joiner = new Joiner<List<InferenceToken<F>>, JoinContext, Collection<Future<?>>>(inputCollectors, (Class<List<InferenceToken<F>>>) cls, false) {
163    
164                                    @Override
165                                    protected boolean isValidInput(int index, List<InferenceToken<F>> input) {
166                                            for (InferenceToken<F> t: input) {
167                                                    if (t.isConsumed() || ((HandleImpl<F>) t.getHandle()).get()==null) {
168                                                            return false;
169                                                    }
170                                            }
171                                            return true;
172                                    }
173    
174                                    @Override
175                                    protected Collection<Future<?>> join(List<InferenceToken<F>>[] inputs, JoinContext context, InputConsumer consumer, int activator) throws Exception {
176                                            return ReasoningNodeBase.this.process(
177                                                            knowledgeBase,
178                                                            inputs,
179                                                            context.properties, 
180                                                            context.context, 
181                                                            context.processingPath, 
182                                                            consumer, 
183                                                            activator);
184                                    }
185    
186                                    @Override
187                                    protected boolean partialJoin(List<InferenceToken<F>>[] inputs,     int index) throws Exception {
188                                            return ReasoningNodeBase.this.partialJoin(inputs, index);
189                                    }
190                            };
191                    }
192                    
193            }
194            
195            /**
196             * This implementation does nothing.
197             * Condition nodes can override this method to sort input pins to optimize inference by
198             * moving pins participating in condition to first places and evaluating conditions
199             * as soon as possible in partialJoin() method. 
200             * @param inputPins Input pins to sort.
201             */
202            protected void sortInputs() {
203                    // NOP
204            }
205    
206            /**
207             * Validates already joined inputs.
208             * @param inputs Inputs
209             * @param index Index of last already joined input.
210             * @return
211             */
212            protected boolean partialJoin(List<InferenceToken<F>>[] inputs, int index) throws Exception {
213                    return true;
214            }
215    
216            protected List<PinEntry<F>> pins = new ArrayList<PinEntry<F>>();
217    
218            @Override
219            public void addPin(String pinName, List<?> pinConfig) {
220                    pins.add(new PinEntry<F>(pinName, pinConfig));            
221            }
222    
223            @Override
224            public Invocable<KnowledgeBase<F>, InferenceToken<F>> getInvocable(String pinName, Integer connectKey) throws ConfigurationException {
225                    for (PinEntry<F> pin: pins) {
226                            if (pinName.equals(pin.getName())) {
227                                    if (pin.invocable==null) {
228                                            final PinEntry<F> pe = pin;
229                                            pin.invocable = new Invocable<KnowledgeBase<F>,InferenceToken<F>>() {
230    
231                                                    @Override
232                                                    public Collection<Future<?>> invoke(
233                                                                    KnowledgeBase<F> flowState,
234                                                                    InferenceToken<F>[] args,
235                                                                    PropertySet properties, 
236                                                                    Context context,
237                                                                    List<ProcessingPathElement> processingPath) throws Exception {
238                                                            return ReasoningNodeBase.this.invoke(flowState, args, pe, properties, context, processingPath);
239                                                    }
240                                                    
241                                            };
242                                    }
243                                    return pin.invocable;
244                            }
245                    }
246    
247                    throw new ConfigurationException("Unknown pin name: "+pinName);
248            }
249            
250            /**
251             * Ivocations come here. Depending on whether joiner is null or not, do direct call to join or through
252             * joiner.
253             * @param pe
254             * @param properties
255             * @param context
256             * @param processingPath
257             * @param args
258             * @throws Exception 
259             */
260            @SuppressWarnings("unchecked")
261            protected Collection<Future<?>> invoke(
262                            KnowledgeBase<F> flowState,
263                            InferenceToken<F>[] args,
264                            PinEntry<F> pe, 
265                            PropertySet properties, 
266                            Context context,
267                            List<ProcessingPathElement> processingPath) throws Exception {
268                    
269                    ThreadInvocation.setThreadInvocation(flowState, args, properties, context, processingPath);
270                    try {
271                            List<InferenceToken<F>> argsList = new ArrayList<InferenceToken<F>>();
272                            for (InferenceToken<F> arg: args) {
273                                    argsList.add(arg);
274                            }
275                            if (joiner==null) { // Direct call
276                                    List<InferenceToken<F>>[] argsListArray = new ArrayList[1];
277                                    argsListArray[0] = argsList;
278                                    if (!partialJoin(argsListArray, 0)) {
279                                            return Collections.emptyList();
280                                    }
281                                    return process(flowState, argsListArray, properties, context, processingPath, null, 0);
282                            } else { // Join
283                                    if (isFine) {
284                                            StringBuilder sb = new StringBuilder("New input at index "+pe.index+" for rule "+rule+", method "+method);
285                                            for (InferenceToken<F> it: args) {
286                                                    sb.append(Constants.LINE_SEPARATOR+"\t"+knowledgeBase.get(it.getHandle()));
287                                            }
288                                            logger.fine(sb.toString());
289                                    }
290                                    Collection<Future<?>> ret = joiner.addInput(pe.index, argsList, new JoinContext(properties, context, processingPath));
291                                    if (ret==null) {
292                                            return Collections.emptyList();
293                                    }
294                                    return ret;
295                            }
296                    } finally {
297                            ThreadInvocation.unsetThreadInvocation();
298                    }
299                    
300            }
301            
302            /**
303             * Processes inputs.
304             * @param properties Properties.
305             * @param context Context.
306             * @param processingPath Processing path.
307             * @param inputs Inputs.
308             * @param consumer Input consumer.
309             * @return Collection of future results.
310             * @throws Exception If anything goes wrong.
311             */
312            protected abstract Collection<Future<?>> process(
313                            KnowledgeBase<F> flowState, 
314                            List<InferenceToken<F>>[] inputs, 
315                            PropertySet properties, 
316                            Context context,
317                            List<ProcessingPathElement> processingPath,
318                            InputConsumer consumer,
319                            int activator) throws Exception;
320    
321            @Override
322            public Invoker<KnowledgeBase<F>, InferenceToken<F>> getInvoker(String pinName, Integer connectKey) throws ConfigurationException {
323                    for (PinEntry<F> pin: pins) {
324                            if (pinName.equals(pin.getName())) {
325                                    Output<F> output = new Output<F>(pin, connectKey);
326                                    pin.outputs.add(output);
327                                    return output;
328                            }
329                    }
330    
331                    throw new ConfigurationException("Unknown pin name: "+pinName);
332            }
333    
334            protected String methodName;
335            
336            public void setMethodName(String methodName) {
337                    this.methodName = methodName;
338            }
339            
340            private int priority;
341    
342            public int getPriority() {
343                    return priority;
344            }
345    
346            public void setPriority(int priority) {
347                    this.priority = priority;
348            }
349            
350            protected String ruleName;
351            protected String ruleDescription;
352            
353            public void setRuleName(String ruleName) {
354                    this.ruleName = ruleName;
355            }
356            
357            public void setRuleDescription(String ruleDescription) {
358                    this.ruleDescription = ruleDescription;
359            }
360                    
361    }