001package com.hammurapi.common.concurrent; 002 003import java.util.ArrayList; 004import java.util.Arrays; 005import java.util.Collection; 006import java.util.concurrent.ExecutionException; 007import java.util.concurrent.ExecutorService; 008import java.util.concurrent.Future; 009import java.util.concurrent.ScheduledExecutorService; 010import java.util.concurrent.TimeUnit; 011 012import com.hammurapi.common.Context; 013import com.hammurapi.convert.ConversionException; 014import com.hammurapi.convert.Converter; 015 016/** 017 * Synapse which operates within JVM. 018 * @author Pavel Vlasov 019 * 020 * @param <R> 021 */ 022public class LocalSynapse<C, R, E extends Exception> implements Synapse<C, R, String, E> { 023 024 // TODO - Option whether to chain property set or not, e.g. no chaining for service calls 025 // TODO - Option to provide sub-set path so the invocable sees only the subset of invoker's property set. 026 // TODO - One way, not to waste memory for collecting returns. Do it automatically if R is Void (if possible) 027 028 private long delay; 029 private long period; 030 private TimeUnit timeUnit; 031 private Invocable<C, R, String, E> invocable; 032 private ClassLoader classLoader; 033 private Class<?>[] parameterTypes; 034 private boolean async; 035 private boolean chainPropertySet; 036 private String psPrefix; 037 private Converter converter; 038 private Context context; 039 040// /** 041// * Creates synapse without delay and without repetitive execution. 042// */ 043// public LocalSynapse(boolean async, ClassLoader classLoader) { 044// this(async,classLoader,0,0,null,true,null); 045// } 046 047 public LocalSynapse( 048 boolean async, 049 ClassLoader classLoader, 050 Converter converter, 051 Context context, 052 long delay, 053 long period, 054 TimeUnit timeUnit, 055 boolean chainPropertySet, 056 String psPrefix) { 057 this.async = async; 058 this.classLoader = classLoader; 059 if (this.classLoader==null) { 060 this.classLoader = this.getClass().getClassLoader(); 061 } 062 this.delay = delay; 063 this.period = period; 064 this.timeUnit = timeUnit; 065 this.chainPropertySet = chainPropertySet; 066 this.psPrefix = psPrefix; 067 this.converter = converter; 068 this.context = context; 069 } 070 071 @Override 072 public void setInvocable(Invocable<C, R, String, E> invocable) { 073 this.invocable = invocable; 074 this.parameterTypes = invocable.getParameterTypes(); 075 } 076 077 @Override 078 public Future<Iterable<R>> invoke(final C context, final Invocation<String> invocation, ExecutorService executorService) { 079 // TODO - control invocation chain, substitute invocation and add invocation chain element. 080 // Synchronous invocation 081 if (async || executorService==null) { 082 if (timeUnit!=null && (delay>0 || period>0)) { 083 throw new InvocationException("Cannot schedule invocations"); 084 } 085 086 final Collection<R> ret = new ArrayList<R>(); 087 try { 088 boolean hasArgumentIterators = false; 089 boolean needsConversion = false; 090 int idx=0; 091 for (Object arg: invocation.getArguments()) { 092 if (arg instanceof ArgumentIterator) { 093 hasArgumentIterators = true; 094 break; 095 } 096 if (arg!=null && !parameterTypes[idx].isInstance(arg)) { 097 needsConversion = true; 098 } 099 ++idx; 100 } 101 102 if (hasArgumentIterators) { 103 Object[] args = Arrays.copyOf(invocation.getArguments(), invocation.getArguments().length); 104 _invoke(context, invocation, executorService, args, 0, ret); 105 } else { 106 if (needsConversion) { 107 final Object[] args = Arrays.copyOf(invocation.getArguments(), invocation.getArguments().length); 108 for (int i=0; i<args.length; ++i) { 109 args[i] = convert(args[i], i); 110 } 111 InvocationImpl<String> iv = new InvocationImpl<String>(invocation); 112 iv.setPropertySet(preparePropertySet(executorService, invocation.getPropertySet())); 113 ret.add(invocable.invoke(context, iv, executorService)); 114 } else { 115 ret.add(invocable.invoke(context, invocation, executorService)); 116 } 117 } 118 119 return new Future<Iterable<R>>() { 120 121 @Override 122 public boolean cancel(boolean mayInterruptIfRunning) { 123 return false; 124 } 125 126 @Override 127 public boolean isCancelled() { 128 return false; 129 } 130 131 @Override 132 public boolean isDone() { 133 return true; 134 } 135 136 @Override 137 public Iterable<R> get() { 138 return ret; 139 } 140 141 @Override 142 public Iterable<R> get(long timeout, TimeUnit unit) { 143 return ret; 144 } 145 }; 146 } catch (final Exception e) { 147 return new Future<Iterable<R>>() { 148 149 @Override 150 public boolean cancel(boolean mayInterruptIfRunning) { 151 return false; 152 } 153 154 @Override 155 public boolean isCancelled() { 156 return false; 157 } 158 159 @Override 160 public boolean isDone() { 161 return true; 162 } 163 164 @Override 165 public Iterable<R> get() throws ExecutionException { 166 throw new ExecutionException(e); 167 } 168 169 @Override 170 public Iterable<R> get(long timeout, TimeUnit unit) throws ExecutionException { 171 throw new ExecutionException(e); 172 } 173 }; 174 175 } 176 } 177 178 if (timeUnit==null || (delay<=0 && period<=0)) { 179 // TODO - Asynchronous invocations. 180 181 182 } 183 184 if (!(executorService instanceof ScheduledExecutorService)) { 185 throw new InvocationException("Cannot schedule invocations - executor service is not a ScheduledExecutorService"); 186 } 187 188 // TODO Scheduled invocations. 189 return null; 190 } 191 192 private PropertySet<String> preparePropertySet(ExecutorService executorService, PropertySet<String> propertySet) { 193 if (propertySet==null) { 194 if (chainPropertySet) { 195 return new LocalStringPropertySet(executorService, converter, context, classLoader); 196 } 197 return null; 198 } 199 200 if (psPrefix!=null) { 201 if (chainPropertySet) { 202 return new LocalStringPropertySet(executorService, converter, context, classLoader, propertySet.subset(psPrefix)); 203 } 204 return propertySet.subset(psPrefix); 205 } 206 if (chainPropertySet) { 207 return new LocalStringPropertySet(executorService, converter, context, classLoader, propertySet); 208 } 209 return propertySet; 210 } 211 212 /** 213 * Iterates over argument iterators 214 * @param invocation 215 * @param executorService 216 * @param args 217 * @param i Current argument index 218 */ 219 private void _invoke(final C context, final Invocation<String> invocation, ExecutorService executorService, final Object[] args, int idx, Collection<R> collector) throws E { 220 if (idx==args.length) { 221 InvocationImpl<String> iv = new InvocationImpl<String>(invocation); 222 iv.setArguments(args); 223 iv.setPropertySet(preparePropertySet(executorService, invocation.getPropertySet())); 224 225 collector.add(invocable.invoke(context, iv, executorService)); 226 } else { 227 if (args[idx] instanceof ArgumentIterator) { 228 ArgumentIterator<Object> ai = (ArgumentIterator<Object>) args[idx]; 229 while (ai.hasNext()) { 230 args[idx] = convert(ai.next(), idx); 231 _invoke(context, invocation, executorService, args, idx+1, collector); 232 } 233 } else { 234 if (args[idx]!=null && !parameterTypes[idx].isInstance(args[idx])) { 235 args[idx] = convert(args[idx], idx); 236 } 237 _invoke(context, invocation, executorService, args, idx+1, collector); 238 } 239 } 240 241 } 242 243 private Object convert(Object arg, int idx) { 244 if (arg==null || parameterTypes[idx].isInstance(arg)) { 245 return null; 246 } 247 Object ret = converter.convert(arg, parameterTypes[idx], context); 248 if (ret==null) { 249 throw new ConversionException("Cannot convert "+arg+" to "+parameterTypes[idx]); 250 } 251 return ret; 252 } 253 254 @Override 255 public Class<Future<Iterable<R>>> getReturnType() { 256 // Strange, but doesn't compile otherwise. 257 return (Class<Future<Iterable<R>>>) (Class) Future.class; 258 } 259 260 @Override 261 public Class<?>[] getParameterTypes() { 262 return invocable.getParameterTypes(); 263 } 264 265}