001    package com.hammurapi.eventbus.snapshot.io;
002    
003    import java.io.File;
004    import java.io.IOException;
005    import java.lang.management.ManagementFactory;
006    import java.util.Collection;
007    import java.util.Date;
008    import java.util.HashMap;
009    import java.util.LinkedList;
010    import java.util.List;
011    import java.util.Map;
012    import java.util.Set;
013    
014    import org.eclipse.emf.common.util.EList;
015    import org.eclipse.emf.common.util.URI;
016    import org.eclipse.emf.ecore.EObject;
017    import org.eclipse.emf.ecore.resource.Resource;
018    import org.eclipse.emf.ecore.resource.ResourceSet;
019    import org.eclipse.emf.ecore.resource.impl.ResourceSetImpl;
020    import org.eclipse.emf.ecore.xmi.impl.XMIResourceFactoryImpl;
021    
022    import com.hammurapi.convert.ConvertingService;
023    import com.hammurapi.eventbus.EventBus;
024    import com.hammurapi.eventbus.EventBusException;
025    import com.hammurapi.eventbus.EventHandler;
026    import com.hammurapi.eventbus.EventStore;
027    import com.hammurapi.eventbus.snapshot.Derivation;
028    import com.hammurapi.eventbus.snapshot.Event;
029    import com.hammurapi.eventbus.snapshot.Handler;
030    import com.hammurapi.eventbus.snapshot.JoinEntry;
031    import com.hammurapi.eventbus.snapshot.JoinInput;
032    import com.hammurapi.eventbus.snapshot.JoinInputCollector;
033    import com.hammurapi.eventbus.snapshot.JoinNode;
034    import com.hammurapi.eventbus.snapshot.PredicateNode;
035    import com.hammurapi.eventbus.snapshot.PredicateNodeOutput;
036    import com.hammurapi.eventbus.snapshot.SnapshotFactory;
037    import com.hammurapi.extract.Predicate;
038    
039    public class SnapshotOutput<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements com.hammurapi.eventbus.AbstractEventBus.StateSnapshot<E, P, C, K, H, S> {
040            
041            com.hammurapi.eventbus.snapshot.Snapshot snapshot;
042            private File out;
043            private Map<K, Object> objMap = new HashMap<K, Object>() {
044                    public Object put(K key, Object value) {
045                            Object oldVal = super.put(key, value);
046                            if (oldVal!=null) {
047                                    throw new IllegalArgumentException("Duplicate key: "+key+". Old value: "+oldVal+", new value "+value);
048                            }
049                            return oldVal;
050                    }
051            };
052            
053            private List<Runnable> refCommands = new LinkedList<Runnable>();
054            
055            /**
056             * Takes snapshot, doesn't write it to a file.
057             */
058            public SnapshotOutput() {
059                    
060            }
061            
062            public SnapshotOutput(File out) {
063                    this.out = out; 
064            }       
065            
066            public com.hammurapi.eventbus.snapshot.Snapshot getSnapshot() {
067                    return snapshot;
068            }
069    
070            @Override
071            public void start() {
072                    snapshot = SnapshotFactory.eINSTANCE.createSnapshot();
073                    snapshot.setTimestamp(new Date());
074                    snapshot.setJvmId(ManagementFactory.getRuntimeMXBean().getName());
075                    objMap.clear();
076                    refCommands.clear();
077            }
078    
079            @Override
080            public void end(boolean success) {
081                    if (success && out!=null) {
082                            for (Runnable c: refCommands) {
083                                    c.run();
084                            }
085                            ResourceSet resourceSet = new ResourceSetImpl();
086                            // Register the appropriate resource factory to handle all file extensions.
087                            //
088                            resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put(Resource.Factory.Registry.DEFAULT_EXTENSION, new XMIResourceFactoryImpl());
089             
090                            URI uri = URI.createFileURI(out.getAbsolutePath());
091                            Resource configResource = resourceSet.createResource(uri);
092                            EList<EObject> contents = configResource.getContents();
093                            contents.add(snapshot);
094                            try {
095                                    configResource.save(null);
096                            } catch (IOException e) {
097                                    throw new EventBusException("Could not save snapshot: "+e, e);
098                            }                       
099                    }
100            }
101    
102            @Override
103            public void handler(K id, EventHandler<E, P, C, H, S> eventHandler) {
104                    Handler handler = ConvertingService.convert(eventHandler, Handler.class);
105                    snapshot.getElements().add(handler);
106                    handler.setId(String.valueOf(id));
107                    objMap.put(id, handler);
108            }
109    
110            @Override
111            public void event(K id, E event, boolean directPost) {
112                    Event se = ConvertingService.convert(event, Event.class);
113                    se.setDirectPost(directPost);
114                    snapshot.getElements().add(se);
115                    se.setId(String.valueOf(id));
116                    objMap.put(id, se);
117            }
118            
119            
120                    
121            @Override
122            public void derivation(final K eventId, final K handlerId, final List<K> inputs) {
123                    refCommands.add(new Runnable() {
124    
125                            @Override
126                            public void run() {
127                                    Derivation der = SnapshotFactory.eINSTANCE.createDerivation();
128                                    Event event = (Event) objMap.get(eventId);
129                                    if (event==null) {
130                                            throw new IllegalArgumentException("Event with id "+eventId+" not found.");
131                                    }
132                                    event.getDerivations().add(der);
133                                    Handler handler = (Handler) objMap.get(handlerId);
134                                    if (handler==null) {
135                                            throw new IllegalArgumentException("Handler with id "+eventId+" not found.");
136                                    }
137                                    der.setHandler(handler);
138    
139                                    for (K inputId: inputs) {
140                                            Event input = (Event) objMap.get(inputId);
141                                            if (input==null) {
142                                                    throw new IllegalArgumentException("Event with id "+inputId+" not found.");
143                                            }                       
144                                            der.getInputs().add(input);
145                                    }
146                            }
147                    });
148                    
149            }
150    
151            @Override
152            public void predicateNode(
153                            final K id, 
154                            final Predicate<E, C> predicate,
155                            final Collection<K> trueChildren, 
156                            final Collection<K> trueHandlers,
157                            final Collection<K> falseChildren, 
158                            final Collection<K> falseHandlers,
159                            final boolean isRoot) {
160                    
161                    final PredicateNode pn = SnapshotFactory.eINSTANCE.createPredicateNode();
162                    snapshot.getElements().add(pn);
163                    pn.setId(String.valueOf(id));
164                    com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class);
165                    sp.setId(String.valueOf(id));
166                    pn.setPredicate(sp);            
167                    pn.setName(sp.getName());
168                    pn.setDetails(sp.getDetails());
169                    pn.setIsRoot(isRoot);
170                    
171                    objMap.put(id, pn);
172                    
173                    refCommands.add(new Runnable() {
174    
175                            @Override
176                            public void run() {
177                                    for (K tcId: trueChildren) {
178                                            PredicateNode tc = (PredicateNode) objMap.get(tcId);
179                                            if (tc==null) {
180                                                    throw new IllegalArgumentException("Predicate node with id "+tcId+" not found.");                               
181                                            }
182                                            pn.getTrueChildren().add(tc);
183                                    }
184                                    for (K fcId: falseChildren) {
185                                            PredicateNode fc = (PredicateNode) objMap.get(fcId);
186                                            if (fc==null) {
187                                                    throw new IllegalArgumentException("Predicate node with id "+fcId+" not found.");                               
188                                            }
189                                            pn.getFalseChildren().add(fc);
190                                    }
191                                    for (K thId: trueHandlers) {
192                                            PredicateNodeOutput th = (PredicateNodeOutput) objMap.get(thId);
193                                            if (th==null) {
194                                                    throw new IllegalArgumentException("Predicate node output with id "+thId+" not found.");                                
195                                            }
196                                            pn.getTrueOutputs().add(th);
197                                    }
198                                    for (K fhId: falseHandlers) {
199                                            PredicateNodeOutput fh = (PredicateNodeOutput) objMap.get(fhId);
200                                            if (fh==null) {
201                                                    throw new IllegalArgumentException("Predicate node output with id "+fhId+" not found.");                                
202                                            }
203                                            pn.getFalseOutputs().add(fh);
204                                    }               
205                            }
206                    });
207                    
208            }
209    
210            @Override
211            public void joinInput(final K id, final K joinNodeId, final int index) {
212                    final JoinInput ji = SnapshotFactory.eINSTANCE.createJoinInput();
213                    ji.setId(String.valueOf(id));
214                    ji.setIndex(index);
215                    objMap.put(id, ji);
216                    snapshot.getElements().add(ji);
217                    
218                    refCommands.add(new Runnable() {
219    
220                            @Override
221                            public void run() {
222                                    JoinNode jn = (JoinNode) objMap.get(joinNodeId);
223                                    if (jn==null) {
224                                            throw new IllegalArgumentException("Join node with id "+joinNodeId+" not found.");
225                                    }
226                                    ji.setJoinNode(jn);
227                            }
228                    });
229                    
230            }
231            
232            @Override
233            public void joinNode(
234                            final K id, 
235                            final Predicate<E, C> predicate,
236                            final Set<Integer> outputIndices, 
237                            final K eventHandlerId, 
238                            final K nextJoinNodeId) {
239                    
240                    final JoinNode jn = SnapshotFactory.eINSTANCE.createJoinNode();
241                    snapshot.getElements().add(jn);
242                    jn.setName(String.valueOf(outputIndices));
243                    objMap.put(id, jn);
244                    jn.setId(String.valueOf(id));
245                    
246                    if (predicate!=null) {
247                            com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class);
248                            sp.setId(String.valueOf(id));
249                            jn.setPredicate(sp);
250                    }
251                    
252                    for (Integer oi: outputIndices) {
253                            jn.getOutputIndices().add(oi);
254                    }
255                    
256                    refCommands.add(new Runnable() {
257    
258                            @Override
259                            public void run() {
260                                    if (eventHandlerId!=null) {
261                                            if (nextJoinNodeId!=null) {
262                                                    throw new IllegalArgumentException("eventHandlerId and nextJoinNodeId are mutually exclusive");
263                                            }
264                                            Handler handler = (Handler) objMap.get(eventHandlerId);
265                                            if (handler==null) {
266                                                    throw new IllegalArgumentException("Handler with id "+eventHandlerId+" not found.");                            
267                                            }
268                                            jn.setHandler(handler);
269                                    } else {
270                                            if (nextJoinNodeId==null) {
271                                                    throw new IllegalArgumentException("Either eventHandlerId or nextJoinNodeId should not be null");                               
272                                            }                       
273                                            JoinNode next = (JoinNode) objMap.get(nextJoinNodeId);
274                                            if (next==null) {
275                                                    throw new IllegalArgumentException("Join node with id "+nextJoinNodeId+" not found.");                          
276                                            }                       
277                                            jn.setNext(next);
278                                    }
279                            }
280                    });
281                    
282            }
283    
284            @Override
285            public void joinInputCollector(
286                            final K joinNodeId, 
287                            final int[] indices,
288                            final Collection<K[]> elements) {
289                    
290                    JoinNode jn = (JoinNode) objMap.get(joinNodeId);                
291                    final JoinInputCollector jic = SnapshotFactory.eINSTANCE.createJoinInputCollector();
292                    jn.getCollectors().add(jic);
293                    for (int idx: indices) {
294                            jic.getIndices().add(idx);
295                    }
296                    
297                    refCommands.add(new Runnable() {
298    
299                            @Override
300                            public void run() {
301                                    for (K[] e: elements) {
302                                            JoinEntry je = SnapshotFactory.eINSTANCE.createJoinEntry();
303                                            for (K ee: e) {
304                                                    Event ev = (Event) objMap.get(ee);
305                                                    if (ev==null) {
306                                                            throw new IllegalArgumentException("Event with id "+ee+" not found.");                                  
307                                                    }
308                                                    je.getEvents().add(ev);
309                                            }
310                                            jic.getJoinEntries().add(je);
311                                    }
312                            }
313                    });
314                    
315                    
316            }
317    
318    }