001package com.hammurapi.common.concurrent;
002
003import java.util.ArrayList;
004import java.util.Arrays;
005import java.util.Collection;
006import java.util.concurrent.ExecutionException;
007import java.util.concurrent.ExecutorService;
008import java.util.concurrent.Future;
009import java.util.concurrent.ScheduledExecutorService;
010import java.util.concurrent.TimeUnit;
011
012import com.hammurapi.common.Context;
013import com.hammurapi.convert.ConversionException;
014import com.hammurapi.convert.Converter;
015
016/**
017 * Synapse which operates within JVM.
018 * @author Pavel Vlasov
019 *
020 * @param <R>
021 */
022public class LocalSynapse<C, R, E extends Exception> implements Synapse<C, R, String, E> {
023        
024        // TODO - Option whether to chain property set or not, e.g. no chaining for service calls
025        // TODO - Option to provide sub-set path so the invocable sees only the subset of invoker's property set.
026        // TODO - One way, not to waste memory for collecting returns. Do it automatically if R is Void (if possible)
027        
028        private long delay;
029        private long period;
030        private TimeUnit timeUnit;
031        private Invocable<C, R, String, E> invocable;
032        private ClassLoader classLoader;
033        private Class<?>[] parameterTypes;
034        private boolean async;
035        private boolean chainPropertySet;
036        private String psPrefix;
037        private Converter converter;
038        private Context context;
039
040//      /**
041//       * Creates synapse without delay and without repetitive execution.
042//       */
043//      public LocalSynapse(boolean async, ClassLoader classLoader) {
044//              this(async,classLoader,0,0,null,true,null);
045//      }
046        
047        public LocalSynapse(
048                        boolean async, 
049                        ClassLoader classLoader, 
050                        Converter converter,
051                        Context context,
052                        long delay, 
053                        long period, 
054                        TimeUnit timeUnit,
055                        boolean chainPropertySet,
056                        String psPrefix) {
057                this.async = async;
058                this.classLoader = classLoader;
059                if (this.classLoader==null) {
060                        this.classLoader = this.getClass().getClassLoader();
061                }
062                this.delay = delay;
063                this.period = period;
064                this.timeUnit = timeUnit;       
065                this.chainPropertySet = chainPropertySet;
066                this.psPrefix = psPrefix;
067                this.converter = converter;
068                this.context = context;
069        }
070        
071        @Override
072        public void setInvocable(Invocable<C, R, String, E> invocable) {
073                this.invocable = invocable;
074                this.parameterTypes = invocable.getParameterTypes();
075        }
076        
077        @Override
078        public Future<Iterable<R>> invoke(final C context, final Invocation<String> invocation, ExecutorService executorService) {
079                // TODO - control invocation chain, substitute invocation and add invocation chain element.
080                // Synchronous invocation
081                if (async || executorService==null) {
082                        if (timeUnit!=null && (delay>0 || period>0)) {
083                                throw new InvocationException("Cannot schedule invocations");
084                        }
085                        
086                        final Collection<R> ret = new ArrayList<R>();
087                        try {
088                                boolean hasArgumentIterators = false;
089                                boolean needsConversion = false;
090                                int idx=0;
091                                for (Object arg: invocation.getArguments()) {
092                                        if (arg instanceof ArgumentIterator) {
093                                                hasArgumentIterators = true;
094                                                break;
095                                        }                                       
096                                        if (arg!=null && !parameterTypes[idx].isInstance(arg)) {
097                                                needsConversion = true;
098                                        }
099                                        ++idx;
100                                }
101                                
102                                if (hasArgumentIterators) {
103                                        Object[] args = Arrays.copyOf(invocation.getArguments(), invocation.getArguments().length);
104                                        _invoke(context, invocation, executorService, args, 0, ret);                                    
105                                } else {
106                                        if (needsConversion) {
107                                                final Object[] args = Arrays.copyOf(invocation.getArguments(), invocation.getArguments().length);
108                                                for (int i=0; i<args.length; ++i) {
109                                                        args[i] = convert(args[i], i);
110                                                }
111                                                InvocationImpl<String> iv = new InvocationImpl<String>(invocation);
112                                                iv.setPropertySet(preparePropertySet(executorService, invocation.getPropertySet()));
113                                                ret.add(invocable.invoke(context, iv, executorService));
114                                        } else {
115                                                ret.add(invocable.invoke(context, invocation, executorService));
116                                        }
117                                }
118                                
119                                return new Future<Iterable<R>>() {
120        
121                                        @Override
122                                        public boolean cancel(boolean mayInterruptIfRunning) {
123                                                return false;
124                                        }
125        
126                                        @Override
127                                        public boolean isCancelled() {
128                                                return false;
129                                        }
130        
131                                        @Override
132                                        public boolean isDone() {
133                                                return true;
134                                        }
135        
136                                        @Override
137                                        public Iterable<R> get() {
138                                                return ret; 
139                                        }
140        
141                                        @Override
142                                        public Iterable<R> get(long timeout, TimeUnit unit) {
143                                                return ret;  
144                                        }                               
145                                };
146                        } catch (final Exception e) {
147                                return new Future<Iterable<R>>() {
148                                        
149                                        @Override
150                                        public boolean cancel(boolean mayInterruptIfRunning) {
151                                                return false;
152                                        }
153        
154                                        @Override
155                                        public boolean isCancelled() {
156                                                return false;
157                                        }
158        
159                                        @Override
160                                        public boolean isDone() {
161                                                return true;
162                                        }
163        
164                                        @Override
165                                        public Iterable<R> get() throws ExecutionException {
166                                                throw new ExecutionException(e); 
167                                        }
168        
169                                        @Override
170                                        public Iterable<R> get(long timeout, TimeUnit unit) throws ExecutionException {
171                                                throw new ExecutionException(e); 
172                                        }                               
173                                };
174                                
175                        }
176                }
177                
178                if (timeUnit==null || (delay<=0 && period<=0)) {
179                        // TODO - Asynchronous invocations.
180                        
181                        
182                }
183                
184                if (!(executorService instanceof ScheduledExecutorService)) {
185                        throw new InvocationException("Cannot schedule invocations - executor service is not a ScheduledExecutorService");
186                }
187                
188                // TODO Scheduled invocations.
189                return null;
190        }
191
192        private PropertySet<String> preparePropertySet(ExecutorService executorService, PropertySet<String> propertySet) {
193                if (propertySet==null) {
194                        if (chainPropertySet) {
195                                return new LocalStringPropertySet(executorService, converter, context, classLoader);
196                        }
197                        return null;
198                }
199                
200                if (psPrefix!=null) {
201                        if (chainPropertySet) {
202                                return new LocalStringPropertySet(executorService, converter, context, classLoader, propertySet.subset(psPrefix));
203                        }
204                        return propertySet.subset(psPrefix);                    
205                }
206                if (chainPropertySet) {
207                        return new LocalStringPropertySet(executorService, converter, context, classLoader, propertySet);
208                }
209                return propertySet;                     
210        }
211
212        /**
213         * Iterates over argument iterators
214         * @param invocation
215         * @param executorService
216         * @param args
217         * @param i Current argument index
218         */
219        private void _invoke(final C context, final Invocation<String> invocation, ExecutorService executorService, final Object[] args, int idx, Collection<R> collector) throws E {
220                if (idx==args.length) {
221                        InvocationImpl<String> iv = new InvocationImpl<String>(invocation);
222                        iv.setArguments(args);
223                        iv.setPropertySet(preparePropertySet(executorService, invocation.getPropertySet()));
224                        
225                        collector.add(invocable.invoke(context, iv, executorService));
226                } else {
227                        if (args[idx] instanceof ArgumentIterator) {
228                                ArgumentIterator<Object> ai = (ArgumentIterator<Object>) args[idx];
229                                while (ai.hasNext()) {
230                                        args[idx] = convert(ai.next(), idx);
231                                        _invoke(context, invocation, executorService, args, idx+1, collector);
232                                }
233                        } else {
234                                if (args[idx]!=null && !parameterTypes[idx].isInstance(args[idx])) {
235                                        args[idx] = convert(args[idx], idx);
236                                }
237                                _invoke(context, invocation, executorService, args, idx+1, collector);
238                        }
239                }
240                
241        }
242        
243        private Object convert(Object arg, int idx) {
244                if (arg==null || parameterTypes[idx].isInstance(arg)) {
245                        return null;
246                }
247                Object ret = converter.convert(arg, parameterTypes[idx], context);
248                if (ret==null) {
249                        throw new ConversionException("Cannot convert "+arg+" to "+parameterTypes[idx]);
250                }
251                return ret;
252        }
253
254        @Override
255        public Class<Future<Iterable<R>>> getReturnType() {
256                // Strange, but doesn't compile otherwise.
257                return (Class<Future<Iterable<R>>>) (Class) Future.class;
258        }
259
260        @Override
261        public Class<?>[] getParameterTypes() {
262                return invocable.getParameterTypes();
263        }               
264
265}