001    package com.hammurapi.reasoning.impl;
002    
003    import java.lang.reflect.Method;
004    import java.util.ArrayList;
005    import java.util.Collection;
006    import java.util.List;
007    import java.util.concurrent.Executor;
008    import java.util.concurrent.Future;
009    import java.util.concurrent.ScheduledExecutorService;
010    
011    import com.hammurapi.config.bootstrap.ConfigurationException;
012    import com.hammurapi.config.runtime.ConfigurationContext;
013    import com.hammurapi.flow.Flow;
014    import com.hammurapi.flow.runtime.Invocable;
015    import com.hammurapi.flow.runtime.Invoker;
016    import com.hammurapi.flow.runtime.ProcessingPathElement;
017    import com.hammurapi.flow.runtime.PropertySet;
018    import com.hammurapi.flow.runtime.PropertySetFactory;
019    import com.hammurapi.flow.runtime.SynapseFactory;
020    import com.hammurapi.flow.runtime.Transition;
021    import com.hammurapi.flow.runtime.impl.ExecutorSynapseFactory;
022    import com.hammurapi.flow.runtime.impl.MapPropertySetFactory;
023    import com.hammurapi.flow.runtime.impl.TaskCountingFlowBase;
024    import com.hammurapi.reasoning.ExceptionHandler;
025    import com.hammurapi.reasoning.ReasoningException;
026    import com.hammurapi.util.Context;
027    
028    /**
029     * Flow which implements rule set.
030     * @author Pavel Vlasov
031     *
032     */
033    public class RuleSetFlow<F> extends TaskCountingFlowBase<ReasoningNodeBase<F>, Transition<KnowledgeBase<F>, InferenceToken<F>>, List<?>, Integer, KnowledgeBase<F>, InferenceToken<F>> {
034            
035            private List<?> rules;
036            private KnowledgeBase<F> knowledgeBase;
037            private Executor executor;
038            private ScheduledExecutorService scheduler;
039            private PropertySetFactory propertySetFactory;
040            
041            private List<Exception> collectedExceptions = new ArrayList<Exception>();
042            
043            private ExceptionHandler exceptionHandler = new ExceptionHandler() {
044                    
045                    @Override
046                    public void handleException(Exception e) throws Exception {
047                            synchronized (collectedExceptions) {
048                                    collectedExceptions.add(e);
049                            }
050                    }
051            };
052            
053            public void checkCollectedExceptions() throws RuleExecutionException {
054                    synchronized (collectedExceptions) {
055                            if (!collectedExceptions.isEmpty()) {
056                                    try {
057                                            throw new RuleExecutionException(collectedExceptions);
058                                    } finally {
059                                            collectedExceptions.clear();
060                                    }
061                            }
062                    }
063            }
064            
065            public KnowledgeBase<F> getKnowledgeBase() {
066                    return knowledgeBase;
067            }
068            
069            public void setKnowledgeBase(KnowledgeBase<F> knowledgeBase) {
070                    this.knowledgeBase = knowledgeBase;
071            }
072            
073            public void setExecutor(Executor executor) {
074                    this.executor = executor;
075            }
076            
077            public Executor getExecutor() {
078                    return executor;
079            }
080            
081            public void setPropertySetFactory(PropertySetFactory propertySetFactory) {
082                    this.propertySetFactory = propertySetFactory;
083            }
084            
085            public PropertySetFactory getPropertySetFactory() {
086                    return propertySetFactory;
087            }
088            
089            public void setScheduler(ScheduledExecutorService scheduler) {
090                    this.scheduler = scheduler;
091            }
092            
093            public ScheduledExecutorService getScheduler() {
094                    return scheduler;
095            }
096                    
097            @Override
098            public void init(ConfigurationContext<Flow> context) throws ConfigurationException {
099                    if (executor==null) {
100                            executor = context.lookup(Executor.class);
101                    }
102                    
103                    if (scheduler==null) {
104                            scheduler = context.lookup(ScheduledExecutorService.class);
105                    }
106                    
107                    if (knowledgeBase==null) {
108                            Flow def = context.getDefinition();                     
109                            knowledgeBase = new InMemoryKnowledgeBase<F>(def==null ? "Knowledge base" : def.getName(), def==null ? null : def.getDescription(), false);
110                            ((InMemoryKnowledgeBase<F>) knowledgeBase).setExecutor(executor);
111                    }       
112                    
113                    if (propertySetFactory==null) {
114                            propertySetFactory = context.lookup(PropertySetFactory.class);
115                            if (propertySetFactory==null) {
116                                    propertySetFactory = new MapPropertySetFactory(executor, scheduler);
117                            }
118                    }
119                    
120                    if (getSynapseFactory()==null) {
121                            setSynapseFactory(context.lookup(SynapseFactory.class));
122                    }
123                    
124                    if (getSynapseFactory()==null && executor!=null) {
125                            PriorityExecutorSynapseFactory<KnowledgeBase<F>, InferenceToken<F>> pesf = new PriorityExecutorSynapseFactory<KnowledgeBase<F>, InferenceToken<F>>();
126                            setSynapseFactory(pesf);
127                            pesf.setExecutor(executor);
128                            pesf.setScheduler(scheduler);
129                            pesf.setPropertySetFactory(propertySetFactory);
130                    } else if (getSynapseFactory() instanceof ExecutorSynapseFactory) {
131                            ExecutorSynapseFactory<Integer, KnowledgeBase<F>, InferenceToken<F>> esf = (ExecutorSynapseFactory<Integer, KnowledgeBase<F>, InferenceToken<F>>) getSynapseFactory();
132                            esf.setExecutor(executor);
133                            esf.setScheduler(scheduler);
134                            esf.setPropertySetFactory(propertySetFactory);                  
135                    }
136                    
137                    if (getSynapseFactory() instanceof PriorityExecutorSynapseFactory) {
138                            ((PriorityExecutorSynapseFactory<KnowledgeBase<F>, InferenceToken<F>>) getSynapseFactory()).setExceptionHandler(exceptionHandler);
139                    }
140                    
141                    
142                    super.init(context);
143                    
144                    if (transitions!=null) {
145                            for (Transition<KnowledgeBase<F>, InferenceToken<F>> t:transitions) {
146                                    if (t instanceof ReasoningTransitionBase) {
147                                            ((ReasoningTransitionBase) t).setKnowledgeBase(knowledgeBase);
148                                    }
149                            }
150                    }
151                    if (nodes!=null) {
152                            for (ReasoningNodeBase<F> n:nodes) {
153                                    n.setKnowledgeBase(knowledgeBase);
154                            }
155                    }               
156            }
157    
158            public void setRules(List<?> rules) throws ConfigurationException {
159                    this.rules = rules;
160                    if (transitions!=null) {
161                            for (Transition<KnowledgeBase<F>,InferenceToken<F>> t:transitions) {
162                                    if (t instanceof ReasoningTransitionBase) {
163                                            ((ReasoningTransitionBase) t).setRules(rules);
164                                    }
165                            }
166                    }
167                    if (nodes!=null) {
168                            for (ReasoningNodeBase<F> n:nodes) {
169                                    n.setRules(rules);
170                            }
171                    }
172            }
173            
174            @Override
175            public void setTransitions(List<Transition<KnowledgeBase<F>,InferenceToken<F>>> transitions) throws ConfigurationException {
176                    super.setTransitions(transitions);
177                    for (Transition<KnowledgeBase<F>,InferenceToken<F>> t:transitions) {
178                            if (t instanceof ReasoningTransitionBase) {
179                                    if (rules!=null) {
180                                            ((ReasoningTransitionBase) t).setRules(rules);                                  
181                                    }
182                                    if (knowledgeBase!=null) {
183                                            ((ReasoningTransitionBase) t).setKnowledgeBase(knowledgeBase);
184                                    }
185                            }
186                    }
187            }
188            
189            @Override
190            public void setNodes(List<ReasoningNodeBase<F>> nodes) throws ConfigurationException {
191                    super.setNodes(nodes);
192                    for (ReasoningNodeBase<F> n:nodes) {
193                            if (rules!=null) {
194                                    n.setRules(rules);
195                            }
196                            if (knowledgeBase!=null) {
197                                    n.setKnowledgeBase(knowledgeBase);
198                            }
199                    }
200            }
201    
202            @Override
203            protected Invocable<KnowledgeBase<F>,InferenceToken<F>> getInternalInvocable(String pinName, Integer connectKey) {
204                    throw new UnsupportedOperationException("RuleSet flow doesn't have outputs");
205            }
206    
207            @Override
208            protected Invoker<KnowledgeBase<F>,InferenceToken<F>> getInternalInvoker(String pinName, Integer connectKey) {
209                    if (pinName.equals(Constants.PUT)) {
210                            return new Invoker<KnowledgeBase<F>,InferenceToken<F>>() {
211            
212                                    @Override
213                                    public void setInvocable(Invocable<KnowledgeBase<F>,InferenceToken<F>> invocable) throws IllegalStateException {
214                                            synchronized (addObjectInvocables) {
215                                                    addObjectInvocables.add(invocable);
216                                            }
217                                    }
218                                    
219                            };
220                    }
221                    throw new IllegalArgumentException("Invalid pin name: "+pinName);
222            }
223    
224            @Override
225            public void setBackEnd(Object object) {
226                    throw new UnsupportedOperationException("RuleSet flow doesn't support back-end object.");
227            }
228            
229            private Collection<Invocable<KnowledgeBase<F>,InferenceToken<F>>> addObjectInvocables = new ArrayList<Invocable<KnowledgeBase<F>,InferenceToken<F>>>();
230            
231            @Override
232            public Invocable<KnowledgeBase<F>,InferenceToken<F>> getInvocable(String pinName, Integer connectKey) throws ConfigurationException {
233                    if (pinName.equals(Constants.PUT)) {
234                            return new Invocable<KnowledgeBase<F>,InferenceToken<F>>() {
235    
236                                    @Override
237                                    public Collection<Future<?>> invoke(
238                                                    KnowledgeBase<F> flowState,
239                                                    InferenceToken<F>[] args,
240                                                    PropertySet properties,
241                                                    Context context,
242                                                    List<ProcessingPathElement> processingPath) throws Exception {                                    
243                                            Collection<Future<?>> ret = new ArrayList<Future<?>>();
244                                            
245                                            for (Invocable<KnowledgeBase<F>,InferenceToken<F>> i: addObjectInvocables) {
246                                                    ret.addAll(i.invoke(flowState, args, properties, context, processingPath));
247                                            }
248                                            return ret;
249                                    }
250                                    
251                            };
252                    } 
253                    throw new IllegalArgumentException("Invalid pin name: "+pinName);
254            }
255    
256            @Override
257            public Invoker<KnowledgeBase<F>,InferenceToken<F>> getInvoker(String pinName, Integer connectKey) throws ConfigurationException {
258                    throw new UnsupportedOperationException("RuleSet flow doesn't have outputs.");
259            }
260            
261            @Override
262            protected Object convertResult(Object proxy, Method method, Object[] args, Collection<Future<?>> invocationResult) throws Exception {
263                    return invocationResult.iterator().next().get();
264            }
265            
266            private List<PinEntry<F>> pins = new ArrayList<PinEntry<F>>();
267    
268            @Override
269            public void addPin(String pinName, List<?> pinConfig) {
270                    pins.add(new PinEntry<F>(pinName, pinConfig));            
271            }
272                    
273            private String name;
274            private String description;
275    
276            public String getDescription() {
277                    return description;
278            }
279    
280            public String getName() {
281                    return name;
282            }
283            
284            public void setName(String name) {
285                    this.name = name;
286            }
287            
288            public void setDescription(String description) {
289                    this.description = description;
290            }
291            
292            void reset() throws ReasoningException {
293                    for (Object rule: rules) {
294                            if (rule instanceof com.hammurapi.reasoning.spi.Rule) {
295                                    ((com.hammurapi.reasoning.spi.Rule) rule).reset();
296                            }
297                    }
298            }
299            
300            void release() throws ReasoningException {
301                    for (Object rule: rules) {
302                            if (rule instanceof com.hammurapi.reasoning.spi.Rule) {
303                                    ((com.hammurapi.reasoning.spi.Rule) rule).release();
304                            }
305                    }
306            }
307            
308            void initRules() throws ReasoningException {
309                    for (Object rule: rules) {
310                            if (rule instanceof com.hammurapi.reasoning.spi.Rule) {
311                                    ((com.hammurapi.reasoning.spi.Rule) rule).init();
312                            }
313                    }
314            }
315            
316    }