1 | package com.hammurapi.eventbus.local; |
2 | |
3 | import java.lang.ref.Reference; |
4 | import java.lang.ref.WeakReference; |
5 | import java.util.ArrayList; |
6 | import java.util.Collection; |
7 | import java.util.Collections; |
8 | import java.util.HashSet; |
9 | import java.util.Iterator; |
10 | import java.util.LinkedList; |
11 | import java.util.Queue; |
12 | import java.util.concurrent.ConcurrentLinkedQueue; |
13 | import java.util.logging.Level; |
14 | import java.util.logging.Logger; |
15 | |
16 | import com.hammurapi.common.ExceptionHandler; |
17 | import com.hammurapi.common.Observable; |
18 | import com.hammurapi.common.Observer; |
19 | import com.hammurapi.common.concurrent.TrackingExecutorService; |
20 | import com.hammurapi.eventbus.AbstractEventBus; |
21 | import com.hammurapi.eventbus.AbstractEventBus.Handle; |
22 | import com.hammurapi.eventbus.Derivation; |
23 | import com.hammurapi.eventbus.EventBus; |
24 | import com.hammurapi.eventbus.EventDispatchException; |
25 | import com.hammurapi.eventbus.EventDispatchMultiCauseException; |
26 | import com.hammurapi.eventbus.EventHandler; |
27 | import com.hammurapi.eventbus.EventStore; |
28 | import com.hammurapi.eventbus.InferenceCommand; |
29 | import com.hammurapi.eventbus.InferenceContext; |
30 | import com.hammurapi.eventbus.InferencePolicy; |
31 | import com.hammurapi.eventbus.PostCommand; |
32 | import com.hammurapi.eventbus.local.LocalEventBusBase.LocalHandle; |
33 | import com.hammurapi.extract.Predicate; |
34 | import com.hammurapi.store.Store; |
35 | |
36 | class MasterHandle<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> implements LocalHandle<E,P,C,S> { |
37 | private Long id; |
38 | private Collection<Derivation<E,P,C>> derivations = new HashSet<Derivation<E,P,C>>(); // Derivations of this handle |
39 | private Collection<FacadeHandle<E,P,C,S>> facades = new ConcurrentLinkedQueue<FacadeHandle<E,P,C,S>>(); |
40 | private TrackingExecutorService joinDelegate; |
41 | private Store.Handle<AbstractEventBus.Handle<E, P, C, Long>, E, S> storeHandle; |
42 | private LocalEventBusBase<E, P, C, S> bus; |
43 | private E event; |
44 | boolean directPost; |
45 | private Predicate<E, S>[] validators; |
46 | private Collection<RemoveListener<E,P,C,S>> removeListeners = new ArrayList<RemoveListener<E,P,C,S>>(); |
47 | |
48 | private static final Logger logger = Logger.getLogger(MasterHandle.class.getName()); |
49 | |
50 | // private ReadWriteLock handleLock = new ReentrantReadWriteLock(); |
51 | |
52 | private final Observer<E> observer = new Observer<E>() { |
53 | |
54 | @Override |
55 | public void update(E obj, Object... args) { |
56 | MasterHandle.this.update(); |
57 | } |
58 | |
59 | }; |
60 | |
61 | void addFacade(FacadeHandle<E,P,C,S> facade) { |
62 | facades.add(facade); |
63 | } |
64 | |
65 | private Reference<Observable<E>> observableRef; |
66 | |
67 | MasterHandle( |
68 | LocalEventBusBase<E, P, C, S> bus, |
69 | E event, |
70 | TrackingExecutorService joinDelegate, |
71 | Long id, |
72 | boolean directPost, |
73 | Predicate<E, S>[] validators) { |
74 | |
75 | this.event = event; // Temporarily until setStoreHandle is invoked if store has primary key extractor. |
76 | this.id = id; |
77 | this.joinDelegate = joinDelegate; |
78 | this.bus = bus; |
79 | this.directPost = directPost; |
80 | this.validators = validators; |
81 | |
82 | com.hammurapi.common.Observable<E> observable = toObservable(event); |
83 | if (observable!=null) { |
84 | observable.addObserver(observer); |
85 | observableRef = new WeakReference<Observable<E>>(observable); |
86 | } |
87 | |
88 | } |
89 | |
90 | @Override |
91 | public Long getId() { |
92 | return id; |
93 | } |
94 | |
95 | @Override |
96 | public Collection<Derivation<E,P,C>> getDerivations() { |
97 | synchronized (derivations) { |
98 | return Collections.unmodifiableSet(new HashSet<Derivation<E,P,C>>(derivations)); |
99 | } |
100 | } |
101 | |
102 | public void addDerivation(Long handlerId, EventHandler<E,P,C,?,?> handler, AbstractEventBus.Handle<E,P,C,Long>[] inputs) { |
103 | synchronized (derivations) { |
104 | DerivationImpl<E,P,C,S> derivation = new DerivationImpl<E,P,C,S>(handlerId, handler, inputs); |
105 | derivations.add(derivation); |
106 | derivation.setMasterHandle(this); |
107 | } |
108 | } |
109 | |
110 | @Override |
111 | public E getEvent() { |
112 | if (storeHandle==null || bus.getStore().getPrimaryKeyExtractor()==null) { |
113 | return event; |
114 | } |
115 | return storeHandle.isValid() ? storeHandle.getPrimaryKey() : null; |
116 | } |
117 | |
118 | @Override |
119 | public void remove() { |
120 | try { |
121 | final Queue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>> inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>>(); |
122 | final TrackingExecutorService hes = bus.createExecutorService(false, "Remove executor"); |
123 | |
124 | InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext = new InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S>() { |
125 | |
126 | @Override |
127 | public int getInferenceChainLength() { |
128 | return 0; |
129 | } |
130 | |
131 | @Override |
132 | public TrackingExecutorService getExecutorService() { |
133 | return hes; |
134 | } |
135 | |
136 | private ExceptionHandler rootHandle = MasterHandle.this; |
137 | |
138 | @Override |
139 | public ExceptionHandler getRootHandle() { |
140 | return rootHandle; |
141 | } |
142 | |
143 | @Override |
144 | public void setRootHandle(ExceptionHandler rootHandle) { |
145 | if (rootHandle!=this.rootHandle && this.rootHandle!=MasterHandle.this) { |
146 | throw new IllegalStateException("Root handle already set"); |
147 | } |
148 | this.rootHandle = rootHandle; |
149 | } |
150 | |
151 | @Override |
152 | public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() { |
153 | return MasterHandle.this.bus; |
154 | } |
155 | |
156 | @Override |
157 | public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createNext() { |
158 | return bus.createInferenceContext(this); |
159 | } |
160 | |
161 | @Override |
162 | public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> wrap() { |
163 | return bus.wrapInferenceContext(this); |
164 | } |
165 | |
166 | @Override |
167 | public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) { |
168 | if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) { |
169 | MasterHandle.this.bus.processInferenceCommand(command); |
170 | } else { |
171 | inferenceCommandsQueue.add(command); |
172 | } |
173 | } |
174 | |
175 | @Override |
176 | public void processInferenceCommands() { |
177 | Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator(); |
178 | while (cit.hasNext()) { |
179 | MasterHandle.this.bus.processInferenceCommand(cit.next()); |
180 | cit.remove(); |
181 | } |
182 | } |
183 | |
184 | @Override |
185 | public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() { |
186 | return inferenceCommandsQueue; |
187 | } |
188 | |
189 | }; |
190 | |
191 | remove(false, inferenceContext); |
192 | if (hes!=null) { |
193 | hes.join(); |
194 | } |
195 | |
196 | while (!inferenceCommandsQueue.isEmpty()) { |
197 | for (InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) { |
198 | bus.processInferenceCommand(conclusion); |
199 | } |
200 | if (hes!=null) { |
201 | hes.join(); |
202 | } |
203 | } |
204 | } catch (Exception e) { |
205 | logger.log(Level.SEVERE, "Remove error: "+e, e); |
206 | if (bus.getExceptionHandler()!=null) { |
207 | bus.getExceptionHandler().handleException(e); |
208 | } |
209 | } |
210 | } |
211 | |
212 | /** |
213 | * When derivation is invalidated, it invokes this method to notify the handle. |
214 | * If handle is not direct post (derived) and there are no good derivations - remove handle. |
215 | * @param derivation |
216 | */ |
217 | void removeDerivation(Derivation<E,P,C> derivation, InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext) { |
218 | boolean toRemove; |
219 | synchronized (derivations) { |
220 | toRemove = derivations.remove(derivation) && !directPost && derivations.isEmpty(); |
221 | } |
222 | |
223 | if (toRemove) { |
224 | remove(false, inferenceContext); |
225 | } |
226 | } |
227 | |
228 | @Override |
229 | public boolean isDerivedFrom(E event) { |
230 | for (Derivation<E,P,C> dd: getDerivations()) { |
231 | if (dd.isDerivedFrom(event)) { |
232 | return true; |
233 | } |
234 | } |
235 | return false; |
236 | } |
237 | |
238 | @Override |
239 | public boolean isValid() { |
240 | if (!storeHandle.isValid()) { |
241 | return false; |
242 | } |
243 | |
244 | if (directPost) { |
245 | return true; |
246 | } |
247 | |
248 | synchronized (derivations) { |
249 | return !derivations.isEmpty(); // When derivation is invalidated it is removed from the derivation list. |
250 | } |
251 | } |
252 | |
253 | @Override |
254 | public void update() { |
255 | try { |
256 | final Queue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>> inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>,S>>(); |
257 | final TrackingExecutorService hes = bus.createExecutorService(false, "Remove executor"); |
258 | |
259 | InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext = new InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S>() { |
260 | |
261 | @Override |
262 | public int getInferenceChainLength() { |
263 | return 0; |
264 | } |
265 | |
266 | @Override |
267 | public TrackingExecutorService getExecutorService() { |
268 | return hes; |
269 | } |
270 | |
271 | private ExceptionHandler rootHandle = MasterHandle.this; |
272 | |
273 | @Override |
274 | public ExceptionHandler getRootHandle() { |
275 | return rootHandle; |
276 | } |
277 | |
278 | @Override |
279 | public void setRootHandle(ExceptionHandler rootHandle) { |
280 | if (rootHandle!=this.rootHandle && this.rootHandle!=MasterHandle.this) { |
281 | throw new IllegalStateException("Root handle already set"); |
282 | } |
283 | this.rootHandle = rootHandle; |
284 | } |
285 | |
286 | @Override |
287 | public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() { |
288 | return MasterHandle.this.bus; |
289 | } |
290 | |
291 | @Override |
292 | public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createNext() { |
293 | return bus.createInferenceContext(this); |
294 | } |
295 | |
296 | @Override |
297 | public InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> wrap() { |
298 | return bus.wrapInferenceContext(this); |
299 | } |
300 | |
301 | @Override |
302 | public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) { |
303 | if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) { |
304 | MasterHandle.this.bus.processInferenceCommand(command); |
305 | } else { |
306 | inferenceCommandsQueue.add(command); |
307 | } |
308 | } |
309 | |
310 | @Override |
311 | public void processInferenceCommands() { |
312 | Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator(); |
313 | while (cit.hasNext()) { |
314 | MasterHandle.this.bus.processInferenceCommand(cit.next()); |
315 | cit.remove(); |
316 | } |
317 | } |
318 | |
319 | @Override |
320 | public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() { |
321 | return inferenceCommandsQueue; |
322 | } |
323 | |
324 | }; |
325 | |
326 | remove(true, inferenceContext); |
327 | |
328 | InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> postCommand = new PostCommand<E, P, C, Long, Handle<E, P, C, Long>, S>(this, null, null, null, inferenceContext); |
329 | inferenceContext.postInferenceCommand(postCommand); |
330 | |
331 | if (hes!=null) { |
332 | hes.join(); |
333 | } |
334 | |
335 | while (!inferenceCommandsQueue.isEmpty()) { |
336 | for (InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> conclusion = inferenceCommandsQueue.poll(); conclusion!=null; conclusion=inferenceCommandsQueue.poll()) { |
337 | bus.processInferenceCommand(conclusion); |
338 | } |
339 | if (hes!=null) { |
340 | hes.join(); |
341 | } |
342 | } |
343 | } catch (Exception e) { |
344 | logger.log(Level.SEVERE, "Remove error: "+e, e); |
345 | if (bus.getExceptionHandler()!=null) { |
346 | bus.getExceptionHandler().handleException(e); |
347 | } |
348 | } |
349 | } |
350 | |
351 | @Override |
352 | public void join() throws InterruptedException { |
353 | if (joinDelegate!=null) { |
354 | joinDelegate.join(); |
355 | } |
356 | checkExceptions(); |
357 | } |
358 | |
359 | private void checkExceptions() { |
360 | if (exceptionCollector.isEmpty()) { |
361 | return; |
362 | } |
363 | |
364 | if (exceptionCollector.size()==1) { |
365 | Exception e = exceptionCollector.iterator().next(); |
366 | exceptionCollector.clear(); |
367 | if (e instanceof EventDispatchException) { |
368 | throw (EventDispatchException) e; |
369 | } |
370 | throw new EventDispatchException(e); |
371 | } |
372 | |
373 | Collection<Exception> ce = Collections.unmodifiableCollection(new ArrayList<Exception>(exceptionCollector)); |
374 | exceptionCollector.clear(); |
375 | throw new EventDispatchMultiCauseException(ce); |
376 | } |
377 | |
378 | @Override |
379 | public boolean join(long timeout) throws InterruptedException { |
380 | boolean ret = joinDelegate.join(timeout); |
381 | checkExceptions(); |
382 | return ret; |
383 | } |
384 | |
385 | @Override |
386 | public <H extends AbstractEventBus.Handle<E,P,C,Long>, SS extends EventStore<E, P, C, H, SS>> void setStoreHandle(Store.Handle<H, E, SS> storeHandle) { |
387 | this.storeHandle = (Store.Handle<Handle<E, P, C, Long>, E, S>) storeHandle; |
388 | if (bus.getStore().getPrimaryKeyExtractor()!=null) { |
389 | this.event = null; |
390 | } |
391 | } |
392 | |
393 | Predicate<E, S>[] getValidators() { |
394 | return validators; |
395 | } |
396 | |
397 | private Observable<E> toObservable(E obj) { |
398 | return bus.observableConverter==null ? null : bus.observableConverter.convert(obj); |
399 | } |
400 | |
401 | @Override |
402 | public String toString() { |
403 | return "MasterHandle [getId()=" + getId() + ", getEvent()=" + getEvent() + ", isValid()=" + isValid() + "]"; |
404 | } |
405 | |
406 | private Collection<Exception> exceptionCollector = Collections.synchronizedCollection(new LinkedList<Exception>()); |
407 | |
408 | @Override |
409 | public void handleException(Exception e) { |
410 | exceptionCollector.add(e); |
411 | } |
412 | |
413 | public boolean isDirectPost() { |
414 | return directPost; |
415 | } |
416 | |
417 | @Override |
418 | public void setDirectPost() { |
419 | directPost = true; |
420 | } |
421 | |
422 | public void addRemoveListener(RemoveListener<E, P, C, S> removeListener) { |
423 | synchronized (removeListeners) { |
424 | removeListeners.add(removeListener); |
425 | } |
426 | } |
427 | |
428 | @Override |
429 | public void remove(boolean forUpdate, InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> inferenceContext) { |
430 | |
431 | // Invalidate all derivatives and facades and clears all derivations. |
432 | for (FacadeHandle<E,P,C,S> fh: facades) { |
433 | fh.invalidateFacade(inferenceContext); |
434 | } |
435 | facades = new ConcurrentLinkedQueue<FacadeHandle<E,P,C,S>>(); |
436 | |
437 | synchronized (derivations) { |
438 | derivations.clear(); |
439 | } |
440 | |
441 | // Fire remove handlers. |
442 | synchronized (removeListeners) { |
443 | for (RemoveListener<E,P,C,S> rl: removeListeners) { |
444 | rl.postRetractCommand(inferenceContext); |
445 | } |
446 | removeListeners.clear(); |
447 | } |
448 | |
449 | // Remove observer if not part of update |
450 | if (!forUpdate) { |
451 | if (observableRef!=null) { |
452 | com.hammurapi.common.Observable<E> observable = observableRef.get(); |
453 | if (observable!=null) { |
454 | observable.deleteObserver(observer); |
455 | } |
456 | } |
457 | storeHandle.remove(); |
458 | } |
459 | } |
460 | |
461 | } |