001    package com.hammurapi.eventbus.local;
002    
003    import java.util.Iterator;
004    import java.util.Queue;
005    import java.util.concurrent.ConcurrentLinkedQueue;
006    import java.util.concurrent.ExecutorService;
007    import java.util.concurrent.TimeUnit;
008    import java.util.concurrent.atomic.AtomicLong;
009    import java.util.concurrent.locks.Lock;
010    import java.util.concurrent.locks.ReentrantLock;
011    import java.util.logging.Logger;
012    
013    import com.hammurapi.common.ExceptionHandler;
014    import com.hammurapi.common.ObservableConverter;
015    import com.hammurapi.common.concurrent.LocalTrackingExecutorService;
016    import com.hammurapi.common.concurrent.TrackingExecutorService;
017    import com.hammurapi.eventbus.AbstractEventBus;
018    import com.hammurapi.eventbus.CompositeInferenceFilter;
019    import com.hammurapi.eventbus.EventBus;
020    import com.hammurapi.eventbus.EventStore;
021    import com.hammurapi.eventbus.InferenceCommand;
022    import com.hammurapi.eventbus.InferenceContext;
023    import com.hammurapi.eventbus.InferenceFilter;
024    import com.hammurapi.eventbus.InferencePolicy;
025    import com.hammurapi.eventbus.Matcher;
026    import com.hammurapi.eventbus.PostCommand;
027    import com.hammurapi.eventbus.RemoveCommand;
028    import com.hammurapi.eventbus.monitoring.StatsCollector;
029    import com.hammurapi.store.local.LocalHandle.HandleStrength;
030    
031    /**
032     * Base class for event bus which functions within JVM boundaries. 
033     * @author Pavel Vlasov
034     *
035     * @param <E> Event type.
036     * @param <P> Handler priority type.
037     * @param <C> Context type.
038     * @param <K> Registration key type.
039     */
040    
041    public class LocalEventBusBase<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> 
042            extends AbstractEventBus<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>  {
043            
044            
045            interface LocalHandle<E,P extends Comparable<P>,C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> extends AbstractEventBus.Handle<E, P, C, Long> {
046                    void addRemoveListener(RemoveListener<E, P, C, S> removeListener);
047                    void remove(boolean forUpdate, InferenceContext<E,P,C,Long,AbstractEventBus.Handle<E,P,C,Long>,S> inferenceContext);
048            }
049                    
050            private static final Logger logger = Logger.getLogger(LocalEventBusBase.class.getName());
051            private LocalTrackingExecutorService executorService;
052            HandleStrength collectorHandleStrength;
053            private int maxDerivationDepth = 100;
054            S unmodifiableStore;
055            ObservableConverter<E> observableConverter;
056            
057            public int getMaxDerivationDepth() {
058                    return maxDerivationDepth;
059            }
060            
061            public void setMaxDerivationDepth(int maxDerivationDepth) {
062                    this.maxDerivationDepth = maxDerivationDepth;
063            }
064            
065            /**
066             * Bus configurator.
067             * @author Pavel Vlasov
068             *
069             */
070            public static class Config<E, P extends Comparable<P>, C, S extends EventStore<E, P, C, AbstractEventBus.Handle<E, P, C, Long>, S>> {
071                    private Class<E> eventType;
072                    private S store;
073                    private InferencePolicy inferencePolicy = InferencePolicy.IMMEDIATELY;
074                    private ExecutorService executorService; 
075                    private HandleStrength collectorHandleStrength = HandleStrength.STRONG;
076                    private ObservableConverter<E> observableConverter;
077                    private boolean assertPredicatesBeforePost;
078                    private StatsCollector statsCollector;
079                    private TimeUnit statsTimeUnit;
080                    private InferenceFilter<E,P,C,Long, AbstractEventBus.Handle<E, P, C, Long>, S>[] inferenceFilters;
081                    private LocalMatcher<E, P, C, S> matcher = new LocalPredicateChainingMatcher<E, P, C, S>();
082                    
083                    public void setStatsCollector(StatsCollector statsCollector) {
084                            this.statsCollector = statsCollector;
085                    }
086                    
087                    public StatsCollector getStatsCollector() {
088                            return statsCollector;
089                    }
090                    
091                    public Class<E> getEventType() {
092                            return eventType;
093                    }
094                    public void setEventType(Class<E> eventType) {
095                            this.eventType = eventType;
096                    }
097                    public S getStore() {
098                            return store;
099                    }
100                    public void setStore(S store) {
101                            this.store = store;
102                    }
103                    public InferencePolicy getInferencePolicy() {
104                            return inferencePolicy;
105                    }
106                    public void setInferencePolicy(InferencePolicy inferencePolicy) {
107                            this.inferencePolicy = inferencePolicy;
108                    }
109                    public ExecutorService getExecutorService() {
110                            return executorService;
111                    }
112                    public void setExecutorService(ExecutorService executorService) {
113                            this.executorService = executorService;
114                    }
115                    public HandleStrength getCollectorHandleStrength() {
116                            return collectorHandleStrength;
117                    }
118                    public void setCollectorHandleStrength(HandleStrength collectorHandleStrength) {
119                            this.collectorHandleStrength = collectorHandleStrength;
120                    }
121                    public ObservableConverter<E> getObservableConverter() {
122                            return observableConverter;
123                    }
124                    public void setObservableConverter(ObservableConverter<E> observableConverter) {
125                            this.observableConverter = observableConverter;
126                    }
127                    public boolean isAssertPredicatesBeforePost() {
128                            return assertPredicatesBeforePost;
129                    }
130                    public void setAssertPredicatesBeforePost(boolean assertPredicatesBeforePost) {
131                            this.assertPredicatesBeforePost = assertPredicatesBeforePost;
132                    }               
133                    public TimeUnit getStatsTimeUnit() {
134                            return statsTimeUnit;
135                    }               
136                    public void setStatsTimeUnit(TimeUnit statsTimeUnit) {
137                            this.statsTimeUnit = statsTimeUnit;
138                    }
139                    public void setInferenceFilter(InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>... inferenceFilters) {
140                            this.inferenceFilters = inferenceFilters;
141                    }
142                    public InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S>[] getInferenceFilters() {
143                            return inferenceFilters;
144                    }
145                    
146                    public Matcher<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> getMatcher() {
147                            return matcher;
148                    }
149                    
150                    public void setMatcher(LocalMatcher<E, P, C, S> matcher) {
151                            this.matcher = matcher;
152                    }
153                    
154                    InferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E, P, C, Long>, S> getInferenceFilter() {
155                            if (inferenceFilters==null || inferenceFilters.length==0) {
156                                    return null;
157                            }
158                            if (inferenceFilters.length==1) {
159                                    return inferenceFilters[0];
160                            }
161                            return new CompositeInferenceFilter<E, P, C, Long, AbstractEventBus.Handle<E,P,C,Long>, S>(inferenceFilters);
162                    }               
163                    
164            }
165            
166            private boolean assertPredicatesBeforePost;
167            
168            boolean isAssertPredicatesBeforePost() {
169                    return assertPredicatesBeforePost;
170            }
171            
172            HandleStrength getCollectorHandleStrength() {
173                    return collectorHandleStrength;
174            }
175                    
176            public LocalEventBusBase(Config<E,P,C,S> config) {
177                    
178                    super(
179                                    config.getEventType(), 
180                                    config.getStore(), 
181                                    config.getInferencePolicy(),
182                                    config.getInferenceFilter(),
183                                    config.getStatsCollector(), 
184                                    config.getStatsTimeUnit(),
185                                    config.getMatcher());
186                    
187                    unmodifiableStore = config.getStore().createUnmodifiableFacade();
188                    this.executorService = config.getExecutorService()==null ? null : new LocalTrackingExecutorService(config.getExecutorService(), false, "Bus executor");
189                    this.collectorHandleStrength = config.getCollectorHandleStrength();
190                    
191                    this.observableConverter = config.getObservableConverter();
192                    this.assertPredicatesBeforePost = config.isAssertPredicatesBeforePost();
193            }
194            
195            @Override
196            protected TrackingExecutorService createExecutorService(ExecutorService master, boolean oneOff, String name) {          
197                    return master==null ? null : new LocalTrackingExecutorService(master, oneOff, name);
198            }
199    
200            @Override
201            protected TrackingExecutorService createExecutorService( boolean oneOff, String name) {
202                    TrackingExecutorService es = getExecutorService();
203                    return es==null ? null : new LocalTrackingExecutorService(es, oneOff, name);
204            }
205            
206            @Override
207            protected TrackingExecutorService getExecutorService() {
208                    return executorService;
209            }
210            
211            private AtomicLong eventCounter = new AtomicLong();
212    
213            @Override
214            protected Long nextId() {
215                    return eventCounter.incrementAndGet();
216            }
217    
218            @Override
219            protected Handle<E, P, C, Long> newMasterHandle(PostCommand<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> postCommand) {
220                    
221                    MasterHandle<E,P,C, S> masterHandle = new MasterHandle<E,P,C, S>(this, postCommand.getEvent(), postCommand.getInferenceContext().getExecutorService(), nextId(), postCommand.isDirectPost(), postCommand.getValidators());
222                    if (!postCommand.isDirectPost()) {
223                            masterHandle.addDerivation(postCommand.getHandlerId(), postCommand.getHandler(), postCommand.getInputs());
224                    }
225                    return masterHandle;
226            }
227                    
228    //      /**
229    //       * To make it visible to local classes.
230    //       */
231    //      @Override
232    //      protected void postInternal(
233    //                      Handle<E, P, C, Long> masterHandle,
234    //                      ExecutorService masterExecutorService,
235    //                      Queue<InferenceCommand<E, P, C, Handle<E, P, C, Long>>> conclusionQueue,
236    //                      ExceptionHandler rootHandle) {
237    //              super.postInternal(masterHandle, masterExecutorService, conclusionQueue, rootHandle);
238    //      }
239            
240    //      /**
241    //       * createMasterHandle() exposed to local dispatch context.
242    //       */
243    //      protected Handle<E,P,C,Long> createMasterHandleExposed(E event, TrackingExecutorService joinDelegate, boolean directPost, Predicate<E,S>[] validators, Derivation<E,P,C> derivation) {
244    //              CreateMasterHandleResult cmhr = createMasterHandle(event, joinDelegate, directPost, validators, derivation);
245    //              return cmhr.isNew() ? cmhr.getHandle() : null;
246    //      }
247    
248            @Override
249            public void remove(E event) {
250                    if (getStore().getPrimaryKeyExtractor()==null) {
251                            getStore().writeLock().lock();
252                            try {
253                                    for (Handle<E,P,C,Long> handle: getStore()) {
254                                            if (event.equals(handle.getEvent())) {
255                                                    handle.remove();
256                                            }
257                                    }
258                            } finally {
259                                    getStore().writeLock().unlock();
260                            }
261                    } else {
262                            Handle<E, P, C, Long> h = getStore().getByPrimaryKey(event);
263                            if (h!=null) {
264                                    h.remove();
265                            }
266                    }
267            }
268            
269            private Lock rtcLock = new ReentrantLock();
270    
271            @Override
272            protected Lock getRtcLock() {
273                    return rtcLock;
274            }
275    
276            /**
277             * To make it visible to local classes.
278             */
279            @Override
280            protected com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long> processInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
281                    return super.processInferenceCommand(command);
282            }
283            
284            @Override
285            protected InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createInferenceContext() {
286                    
287                    boolean createQueue = InferencePolicy.AFTER_EVENT.compareTo(getInferencePolicy())<=0;
288                    final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue = createQueue ? new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>() : null;
289                    final TrackingExecutorService hes = createExecutorService(false, "Handle executor");
290                    
291                    return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() {
292    
293                            private ExceptionHandler rootHandle;
294    
295                            @Override
296                            public int getInferenceChainLength() {
297                                    return 0;
298                            }
299    
300                            @Override
301                            public TrackingExecutorService getExecutorService() {
302                                    return hes;
303                            }
304    
305                            @Override
306                            public ExceptionHandler getRootHandle() {
307                                    return rootHandle;
308                            }
309    
310                            @Override
311                            public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
312                                    return LocalEventBusBase.this;
313                            }
314    
315                            @Override
316                            public void setRootHandle(ExceptionHandler rootHandle) {
317                                    if (this.rootHandle!=null && this.rootHandle!=rootHandle) {
318                                            throw new IllegalStateException("Root handle already set");
319                                    }
320                                    this.rootHandle = rootHandle;
321                            }
322    
323                            @Override
324                            public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() {
325                                    return LocalEventBusBase.this.createInferenceContext(this);
326                            }
327                            
328    
329                            @Override
330                            public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() {
331                                    return LocalEventBusBase.this.wrapInferenceContext(this);
332                            }
333                            
334                            @Override
335                            public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
336                                    if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
337                                            LocalEventBusBase.this.processInferenceCommand(command);
338                                    } else {
339                                            inferenceCommandsQueue.add(command);
340                                    }
341                            }
342    
343                            @Override
344                            public void processInferenceCommands() {
345                                    Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
346                                    while (cit.hasNext()) {
347                                            LocalEventBusBase.this.processInferenceCommand(cit.next());
348                                            cit.remove();
349                                    }
350                            }
351                            
352                            @Override
353                            public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
354                                    return inferenceCommandsQueue;
355                            }
356                            
357                    };
358            }
359    
360            InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> createInferenceContext(final InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> prev) {
361                    final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue;
362                    switch (getInferencePolicy()) {
363                    case EXCLUSIVE:
364                    case AFTER_ROOT_EVENT:
365                            inferenceCommandsQueue = prev.getInferenceCommandsQueue(); // Inherited.
366                            break;
367                    case AFTER_EVENT:
368                            inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>(); // New queue for after event.
369                            break;
370                    case AFTER_HANDLER:
371                    case IMMEDIATELY:
372                            inferenceCommandsQueue = null; // No queue for 
373                            break;
374                    default:
375                            throw new IllegalArgumentException("Unexpected inference policy: "+getInferencePolicy());               
376                    }
377                    
378                    final TrackingExecutorService hes = createExecutorService(prev.getExecutorService(), false, "Handle executor");
379                    
380                    return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() {
381    
382                            @Override
383                            public int getInferenceChainLength() {
384                                    return prev.getInferenceChainLength()+1;
385                            }
386    
387                            @Override
388                            public TrackingExecutorService getExecutorService() {
389                                    return hes;
390                            }
391    
392                            @Override
393                            public ExceptionHandler getRootHandle() {
394                                    return prev.getRootHandle();
395                            }
396    
397                            @Override
398                            public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
399                                    return LocalEventBusBase.this;
400                            }
401    
402                            @Override
403                            public void setRootHandle(ExceptionHandler rootHandle) {
404                                    prev.setRootHandle(rootHandle);
405                            }                                       
406    
407                            @Override
408                            public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() {
409                                    return LocalEventBusBase.this.createInferenceContext(this);
410                            }
411    
412                            @Override
413                            public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() {
414                                    return LocalEventBusBase.this.wrapInferenceContext(this);
415                            }
416                            
417                            
418                            @Override
419                            public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
420                                    if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
421                                            LocalEventBusBase.this.processInferenceCommand(command);
422                                    } else {
423                                            inferenceCommandsQueue.add(command);
424                                    }
425                            }
426    
427                            @Override
428                            public void processInferenceCommands() {
429                                    Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
430                                    while (cit.hasNext()) {
431                                            LocalEventBusBase.this.processInferenceCommand(cit.next());
432                                            cit.remove();
433                                    }
434                            }
435                            
436                            @Override
437                            public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
438                                    return inferenceCommandsQueue;
439                            }
440                                                    
441                    };
442            }
443            
444            InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> wrapInferenceContext(final InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S> master) {
445                    if (!InferencePolicy.AFTER_HANDLER.equals(getInferencePolicy())) {
446                            throw new IllegalStateException("Can't use this method if inference policy is not AFTER_HANDLER");
447                    }
448                    
449                    final Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> inferenceCommandsQueue = new ConcurrentLinkedQueue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>>(); // New queue for after event.
450                    
451                    return new InferenceContext<E, P, C, Long, Handle<E, P, C, Long>, S>() {
452    
453                            @Override
454                            public int getInferenceChainLength() {
455                                    return master.getInferenceChainLength();
456                            }
457    
458                            @Override
459                            public TrackingExecutorService getExecutorService() {
460                                    return master.getExecutorService();
461                            }
462    
463                            @Override
464                            public ExceptionHandler getRootHandle() {
465                                    return master.getRootHandle();
466                            }
467    
468                            @Override
469                            public EventBus<E, P, C, Long, Handle<E, P, C, Long>, S> getBus() {
470                                    return LocalEventBusBase.this;
471                            }
472    
473                            @Override
474                            public void setRootHandle(ExceptionHandler rootHandle) {
475                                    throw new UnsupportedOperationException();
476                            }                                       
477    
478                            @Override
479                            public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> createNext() {
480                                    return master.createNext();
481                            }
482    
483                            @Override
484                            public InferenceContext<E, P, C, Long, com.hammurapi.eventbus.AbstractEventBus.Handle<E, P, C, Long>, S> wrap() {
485                                    return LocalEventBusBase.this.wrapInferenceContext(this);
486                            }
487                            
488                            @Override
489                            public void postInferenceCommand(InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
490                                    if (InferencePolicy.IMMEDIATELY.equals(getBus().getInferencePolicy())) {
491                                            LocalEventBusBase.this.processInferenceCommand(command);
492                                    } else {
493                                            inferenceCommandsQueue.add(command);
494                                    }
495                            }
496    
497                            @Override
498                            public void processInferenceCommands() {
499                                    Iterator<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> cit = inferenceCommandsQueue.iterator();
500                                    while (cit.hasNext()) {
501                                            LocalEventBusBase.this.processInferenceCommand(cit.next());
502                                            cit.remove();
503                                    }
504                            }
505                            
506                            @Override
507                            public Queue<InferenceCommand<E, P, C, Long, Handle<E, P, C, Long>, S>> getInferenceCommandsQueue() {
508                                    return inferenceCommandsQueue;
509                            }
510                            
511                    };
512            }
513    
514            @Override
515            protected void processRemoveCommand(RemoveCommand<E, P, C, Long, Handle<E, P, C, Long>, S> command) {
516                    ((LocalHandle<E,P,C,S>) command.getHandle()).remove(command.isForUpdate(), command.getInferenceContext());
517            }
518            
519    //      Iterator<InferenceCommand<E, P, C, K, H, S>> cit = inferenceContext.getInferenceCommandsQueue().iterator();
520    //      while (cit.hasNext()) {
521    //              processInferenceCommand(cit.next());
522    //              cit.remove();
523    //      }
524            
525    //      if (InferencePolicy.IMMEDIATELY.equals(inferenceContext.getBus().getInferencePolicy())) {
526    //              ((AbstractEventBus<E,P,C,K,H,S>) inferenceContext.getBus()).processInferenceCommand(postCommand);
527    //      } else {
528            
529            
530            
531    }