001 package com.hammurapi.eventbus.snapshot.io;
002
003 import java.io.File;
004 import java.io.IOException;
005 import java.lang.management.ManagementFactory;
006 import java.util.Collection;
007 import java.util.Date;
008 import java.util.HashMap;
009 import java.util.LinkedList;
010 import java.util.List;
011 import java.util.Map;
012 import java.util.Set;
013
014 import org.eclipse.emf.common.util.EList;
015 import org.eclipse.emf.common.util.URI;
016 import org.eclipse.emf.ecore.EObject;
017 import org.eclipse.emf.ecore.resource.Resource;
018 import org.eclipse.emf.ecore.resource.ResourceSet;
019 import org.eclipse.emf.ecore.resource.impl.ResourceSetImpl;
020 import org.eclipse.emf.ecore.xmi.impl.XMIResourceFactoryImpl;
021
022 import com.hammurapi.convert.ConvertingService;
023 import com.hammurapi.eventbus.EventBus;
024 import com.hammurapi.eventbus.EventBusException;
025 import com.hammurapi.eventbus.EventHandler;
026 import com.hammurapi.eventbus.EventStore;
027 import com.hammurapi.eventbus.snapshot.Derivation;
028 import com.hammurapi.eventbus.snapshot.Event;
029 import com.hammurapi.eventbus.snapshot.Handler;
030 import com.hammurapi.eventbus.snapshot.JoinEntry;
031 import com.hammurapi.eventbus.snapshot.JoinInput;
032 import com.hammurapi.eventbus.snapshot.JoinInputCollector;
033 import com.hammurapi.eventbus.snapshot.JoinNode;
034 import com.hammurapi.eventbus.snapshot.PredicateNode;
035 import com.hammurapi.eventbus.snapshot.PredicateNodeOutput;
036 import com.hammurapi.eventbus.snapshot.SnapshotFactory;
037 import com.hammurapi.extract.Predicate;
038
039 public class SnapshotOutput<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements com.hammurapi.eventbus.AbstractEventBus.StateSnapshot<E, P, C, K, H, S> {
040
041 com.hammurapi.eventbus.snapshot.Snapshot snapshot;
042 private File out;
043 private Map<K, Object> objMap = new HashMap<K, Object>() {
044 public Object put(K key, Object value) {
045 Object oldVal = super.put(key, value);
046 if (oldVal!=null) {
047 throw new IllegalArgumentException("Duplicate key: "+key+". Old value: "+oldVal+", new value "+value);
048 }
049 return oldVal;
050 }
051 };
052
053 private List<Runnable> refCommands = new LinkedList<Runnable>();
054
055 /**
056 * Takes snapshot, doesn't write it to a file.
057 */
058 public SnapshotOutput() {
059
060 }
061
062 public SnapshotOutput(File out) {
063 this.out = out;
064 }
065
066 public com.hammurapi.eventbus.snapshot.Snapshot getSnapshot() {
067 return snapshot;
068 }
069
070 @Override
071 public void start() {
072 snapshot = SnapshotFactory.eINSTANCE.createSnapshot();
073 snapshot.setTimestamp(new Date());
074 snapshot.setJvmId(ManagementFactory.getRuntimeMXBean().getName());
075 objMap.clear();
076 refCommands.clear();
077 }
078
079 @Override
080 public void end(boolean success) {
081 if (success && out!=null) {
082 for (Runnable c: refCommands) {
083 c.run();
084 }
085 ResourceSet resourceSet = new ResourceSetImpl();
086 // Register the appropriate resource factory to handle all file extensions.
087 //
088 resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put(Resource.Factory.Registry.DEFAULT_EXTENSION, new XMIResourceFactoryImpl());
089
090 URI uri = URI.createFileURI(out.getAbsolutePath());
091 Resource configResource = resourceSet.createResource(uri);
092 EList<EObject> contents = configResource.getContents();
093 contents.add(snapshot);
094 try {
095 configResource.save(null);
096 } catch (IOException e) {
097 throw new EventBusException("Could not save snapshot: "+e, e);
098 }
099 }
100 }
101
102 @Override
103 public void handler(K id, EventHandler<E, P, C, H, S> eventHandler) {
104 Handler handler = ConvertingService.convert(eventHandler, Handler.class);
105 snapshot.getElements().add(handler);
106 handler.setId(String.valueOf(id));
107 objMap.put(id, handler);
108 }
109
110 @Override
111 public void event(K id, E event, boolean directPost) {
112 Event se = ConvertingService.convert(event, Event.class);
113 se.setDirectPost(directPost);
114 snapshot.getElements().add(se);
115 se.setId(String.valueOf(id));
116 objMap.put(id, se);
117 }
118
119
120
121 @Override
122 public void derivation(final K eventId, final K handlerId, final List<K> inputs) {
123 refCommands.add(new Runnable() {
124
125 @Override
126 public void run() {
127 Derivation der = SnapshotFactory.eINSTANCE.createDerivation();
128 Event event = (Event) objMap.get(eventId);
129 if (event==null) {
130 throw new IllegalArgumentException("Event with id "+eventId+" not found.");
131 }
132 event.getDerivations().add(der);
133 Handler handler = (Handler) objMap.get(handlerId);
134 if (handler==null) {
135 throw new IllegalArgumentException("Handler with id "+eventId+" not found.");
136 }
137 der.setHandler(handler);
138
139 for (K inputId: inputs) {
140 Event input = (Event) objMap.get(inputId);
141 if (input==null) {
142 throw new IllegalArgumentException("Event with id "+inputId+" not found.");
143 }
144 der.getInputs().add(input);
145 }
146 }
147 });
148
149 }
150
151 @Override
152 public void predicateNode(
153 final K id,
154 final Predicate<E, C> predicate,
155 final Collection<K> trueChildren,
156 final Collection<K> trueHandlers,
157 final Collection<K> falseChildren,
158 final Collection<K> falseHandlers,
159 final boolean isRoot) {
160
161 final PredicateNode pn = SnapshotFactory.eINSTANCE.createPredicateNode();
162 snapshot.getElements().add(pn);
163 pn.setId(String.valueOf(id));
164 com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class);
165 sp.setId(String.valueOf(id));
166 pn.setPredicate(sp);
167 pn.setName(sp.getName());
168 pn.setDetails(sp.getDetails());
169 pn.setIsRoot(isRoot);
170
171 objMap.put(id, pn);
172
173 refCommands.add(new Runnable() {
174
175 @Override
176 public void run() {
177 for (K tcId: trueChildren) {
178 PredicateNode tc = (PredicateNode) objMap.get(tcId);
179 if (tc==null) {
180 throw new IllegalArgumentException("Predicate node with id "+tcId+" not found.");
181 }
182 pn.getTrueChildren().add(tc);
183 }
184 for (K fcId: falseChildren) {
185 PredicateNode fc = (PredicateNode) objMap.get(fcId);
186 if (fc==null) {
187 throw new IllegalArgumentException("Predicate node with id "+fcId+" not found.");
188 }
189 pn.getFalseChildren().add(fc);
190 }
191 for (K thId: trueHandlers) {
192 PredicateNodeOutput th = (PredicateNodeOutput) objMap.get(thId);
193 if (th==null) {
194 throw new IllegalArgumentException("Predicate node output with id "+thId+" not found.");
195 }
196 pn.getTrueOutputs().add(th);
197 }
198 for (K fhId: falseHandlers) {
199 PredicateNodeOutput fh = (PredicateNodeOutput) objMap.get(fhId);
200 if (fh==null) {
201 throw new IllegalArgumentException("Predicate node output with id "+fhId+" not found.");
202 }
203 pn.getFalseOutputs().add(fh);
204 }
205 }
206 });
207
208 }
209
210 @Override
211 public void joinInput(final K id, final K joinNodeId, final int index) {
212 final JoinInput ji = SnapshotFactory.eINSTANCE.createJoinInput();
213 ji.setId(String.valueOf(id));
214 ji.setIndex(index);
215 objMap.put(id, ji);
216 snapshot.getElements().add(ji);
217
218 refCommands.add(new Runnable() {
219
220 @Override
221 public void run() {
222 JoinNode jn = (JoinNode) objMap.get(joinNodeId);
223 if (jn==null) {
224 throw new IllegalArgumentException("Join node with id "+joinNodeId+" not found.");
225 }
226 ji.setJoinNode(jn);
227 }
228 });
229
230 }
231
232 @Override
233 public void joinNode(
234 final K id,
235 final Predicate<E, C> predicate,
236 final Set<Integer> outputIndices,
237 final K eventHandlerId,
238 final K nextJoinNodeId) {
239
240 final JoinNode jn = SnapshotFactory.eINSTANCE.createJoinNode();
241 snapshot.getElements().add(jn);
242 jn.setName(String.valueOf(outputIndices));
243 objMap.put(id, jn);
244 jn.setId(String.valueOf(id));
245
246 if (predicate!=null) {
247 com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class);
248 sp.setId(String.valueOf(id));
249 jn.setPredicate(sp);
250 }
251
252 for (Integer oi: outputIndices) {
253 jn.getOutputIndices().add(oi);
254 }
255
256 refCommands.add(new Runnable() {
257
258 @Override
259 public void run() {
260 if (eventHandlerId!=null) {
261 if (nextJoinNodeId!=null) {
262 throw new IllegalArgumentException("eventHandlerId and nextJoinNodeId are mutually exclusive");
263 }
264 Handler handler = (Handler) objMap.get(eventHandlerId);
265 if (handler==null) {
266 throw new IllegalArgumentException("Handler with id "+eventHandlerId+" not found.");
267 }
268 jn.setHandler(handler);
269 } else {
270 if (nextJoinNodeId==null) {
271 throw new IllegalArgumentException("Either eventHandlerId or nextJoinNodeId should not be null");
272 }
273 JoinNode next = (JoinNode) objMap.get(nextJoinNodeId);
274 if (next==null) {
275 throw new IllegalArgumentException("Join node with id "+nextJoinNodeId+" not found.");
276 }
277 jn.setNext(next);
278 }
279 }
280 });
281
282 }
283
284 @Override
285 public void joinInputCollector(
286 final K joinNodeId,
287 final int[] indices,
288 final Collection<K[]> elements) {
289
290 JoinNode jn = (JoinNode) objMap.get(joinNodeId);
291 final JoinInputCollector jic = SnapshotFactory.eINSTANCE.createJoinInputCollector();
292 jn.getCollectors().add(jic);
293 for (int idx: indices) {
294 jic.getIndices().add(idx);
295 }
296
297 refCommands.add(new Runnable() {
298
299 @Override
300 public void run() {
301 for (K[] e: elements) {
302 JoinEntry je = SnapshotFactory.eINSTANCE.createJoinEntry();
303 for (K ee: e) {
304 Event ev = (Event) objMap.get(ee);
305 if (ev==null) {
306 throw new IllegalArgumentException("Event with id "+ee+" not found.");
307 }
308 je.getEvents().add(ev);
309 }
310 jic.getJoinEntries().add(je);
311 }
312 }
313 });
314
315
316 }
317
318 }