1 | package com.hammurapi.eventbus.snapshot.io; |
2 | |
3 | import java.io.File; |
4 | import java.io.IOException; |
5 | import java.lang.management.ManagementFactory; |
6 | import java.util.Collection; |
7 | import java.util.Date; |
8 | import java.util.HashMap; |
9 | import java.util.LinkedList; |
10 | import java.util.List; |
11 | import java.util.Map; |
12 | import java.util.Set; |
13 | |
14 | import org.eclipse.emf.common.util.EList; |
15 | import org.eclipse.emf.common.util.URI; |
16 | import org.eclipse.emf.ecore.EObject; |
17 | import org.eclipse.emf.ecore.resource.Resource; |
18 | import org.eclipse.emf.ecore.resource.ResourceSet; |
19 | import org.eclipse.emf.ecore.resource.impl.ResourceSetImpl; |
20 | import org.eclipse.emf.ecore.xmi.impl.XMIResourceFactoryImpl; |
21 | |
22 | import com.hammurapi.convert.ConvertingService; |
23 | import com.hammurapi.eventbus.EventBus; |
24 | import com.hammurapi.eventbus.EventBusException; |
25 | import com.hammurapi.eventbus.EventHandler; |
26 | import com.hammurapi.eventbus.EventStore; |
27 | import com.hammurapi.eventbus.snapshot.Derivation; |
28 | import com.hammurapi.eventbus.snapshot.Event; |
29 | import com.hammurapi.eventbus.snapshot.Handler; |
30 | import com.hammurapi.eventbus.snapshot.JoinEntry; |
31 | import com.hammurapi.eventbus.snapshot.JoinInput; |
32 | import com.hammurapi.eventbus.snapshot.JoinInputCollector; |
33 | import com.hammurapi.eventbus.snapshot.JoinNode; |
34 | import com.hammurapi.eventbus.snapshot.PredicateNode; |
35 | import com.hammurapi.eventbus.snapshot.PredicateNodeOutput; |
36 | import com.hammurapi.eventbus.snapshot.SnapshotFactory; |
37 | import com.hammurapi.extract.Predicate; |
38 | |
39 | public class SnapshotOutput<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements com.hammurapi.eventbus.AbstractEventBus.StateSnapshot<E, P, C, K, H, S> { |
40 | |
41 | com.hammurapi.eventbus.snapshot.Snapshot snapshot; |
42 | private File out; |
43 | private Map<K, Object> objMap = new HashMap<K, Object>() { |
44 | public Object put(K key, Object value) { |
45 | Object oldVal = super.put(key, value); |
46 | if (oldVal!=null) { |
47 | throw new IllegalArgumentException("Duplicate key: "+key+". Old value: "+oldVal+", new value "+value); |
48 | } |
49 | return oldVal; |
50 | } |
51 | }; |
52 | |
53 | private List<Runnable> refCommands = new LinkedList<Runnable>(); |
54 | |
55 | /** |
56 | * Takes snapshot, doesn't write it to a file. |
57 | */ |
58 | public SnapshotOutput() { |
59 | |
60 | } |
61 | |
62 | public SnapshotOutput(File out) { |
63 | this.out = out; |
64 | } |
65 | |
66 | public com.hammurapi.eventbus.snapshot.Snapshot getSnapshot() { |
67 | return snapshot; |
68 | } |
69 | |
70 | @Override |
71 | public void start() { |
72 | snapshot = SnapshotFactory.eINSTANCE.createSnapshot(); |
73 | snapshot.setTimestamp(new Date()); |
74 | snapshot.setJvmId(ManagementFactory.getRuntimeMXBean().getName()); |
75 | objMap.clear(); |
76 | refCommands.clear(); |
77 | } |
78 | |
79 | @Override |
80 | public void end(boolean success) { |
81 | if (success && out!=null) { |
82 | for (Runnable c: refCommands) { |
83 | c.run(); |
84 | } |
85 | ResourceSet resourceSet = new ResourceSetImpl(); |
86 | // Register the appropriate resource factory to handle all file extensions. |
87 | // |
88 | resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put(Resource.Factory.Registry.DEFAULT_EXTENSION, new XMIResourceFactoryImpl()); |
89 | |
90 | URI uri = URI.createFileURI(out.getAbsolutePath()); |
91 | Resource configResource = resourceSet.createResource(uri); |
92 | EList<EObject> contents = configResource.getContents(); |
93 | contents.add(snapshot); |
94 | try { |
95 | configResource.save(null); |
96 | } catch (IOException e) { |
97 | throw new EventBusException("Could not save snapshot: "+e, e); |
98 | } |
99 | } |
100 | } |
101 | |
102 | @Override |
103 | public void handler(K id, EventHandler<E, P, C, H, S> eventHandler) { |
104 | Handler handler = ConvertingService.convert(eventHandler, Handler.class); |
105 | snapshot.getElements().add(handler); |
106 | handler.setId(String.valueOf(id)); |
107 | objMap.put(id, handler); |
108 | } |
109 | |
110 | @Override |
111 | public void event(K id, E event, boolean directPost) { |
112 | Event se = ConvertingService.convert(event, Event.class); |
113 | se.setDirectPost(directPost); |
114 | snapshot.getElements().add(se); |
115 | se.setId(String.valueOf(id)); |
116 | objMap.put(id, se); |
117 | } |
118 | |
119 | |
120 | |
121 | @Override |
122 | public void derivation(final K eventId, final K handlerId, final List<K> inputs) { |
123 | refCommands.add(new Runnable() { |
124 | |
125 | @Override |
126 | public void run() { |
127 | Derivation der = SnapshotFactory.eINSTANCE.createDerivation(); |
128 | Event event = (Event) objMap.get(eventId); |
129 | if (event==null) { |
130 | throw new IllegalArgumentException("Event with id "+eventId+" not found."); |
131 | } |
132 | event.getDerivations().add(der); |
133 | Handler handler = (Handler) objMap.get(handlerId); |
134 | if (handler==null) { |
135 | throw new IllegalArgumentException("Handler with id "+eventId+" not found."); |
136 | } |
137 | der.setHandler(handler); |
138 | |
139 | for (K inputId: inputs) { |
140 | Event input = (Event) objMap.get(inputId); |
141 | if (input==null) { |
142 | throw new IllegalArgumentException("Event with id "+inputId+" not found."); |
143 | } |
144 | der.getInputs().add(input); |
145 | } |
146 | } |
147 | }); |
148 | |
149 | } |
150 | |
151 | @Override |
152 | public void predicateNode( |
153 | final K id, |
154 | final Predicate<E, C> predicate, |
155 | final Collection<K> trueChildren, |
156 | final Collection<K> trueHandlers, |
157 | final Collection<K> falseChildren, |
158 | final Collection<K> falseHandlers, |
159 | final boolean isRoot) { |
160 | |
161 | final PredicateNode pn = SnapshotFactory.eINSTANCE.createPredicateNode(); |
162 | snapshot.getElements().add(pn); |
163 | pn.setId(String.valueOf(id)); |
164 | com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class); |
165 | sp.setId(String.valueOf(id)); |
166 | pn.setPredicate(sp); |
167 | pn.setName(sp.getName()); |
168 | pn.setDetails(sp.getDetails()); |
169 | pn.setIsRoot(isRoot); |
170 | |
171 | objMap.put(id, pn); |
172 | |
173 | refCommands.add(new Runnable() { |
174 | |
175 | @Override |
176 | public void run() { |
177 | for (K tcId: trueChildren) { |
178 | PredicateNode tc = (PredicateNode) objMap.get(tcId); |
179 | if (tc==null) { |
180 | throw new IllegalArgumentException("Predicate node with id "+tcId+" not found."); |
181 | } |
182 | pn.getTrueChildren().add(tc); |
183 | } |
184 | for (K fcId: falseChildren) { |
185 | PredicateNode fc = (PredicateNode) objMap.get(fcId); |
186 | if (fc==null) { |
187 | throw new IllegalArgumentException("Predicate node with id "+fcId+" not found."); |
188 | } |
189 | pn.getFalseChildren().add(fc); |
190 | } |
191 | for (K thId: trueHandlers) { |
192 | PredicateNodeOutput th = (PredicateNodeOutput) objMap.get(thId); |
193 | if (th==null) { |
194 | throw new IllegalArgumentException("Predicate node output with id "+thId+" not found."); |
195 | } |
196 | pn.getTrueOutputs().add(th); |
197 | } |
198 | for (K fhId: falseHandlers) { |
199 | PredicateNodeOutput fh = (PredicateNodeOutput) objMap.get(fhId); |
200 | if (fh==null) { |
201 | throw new IllegalArgumentException("Predicate node output with id "+fhId+" not found."); |
202 | } |
203 | pn.getFalseOutputs().add(fh); |
204 | } |
205 | } |
206 | }); |
207 | |
208 | } |
209 | |
210 | @Override |
211 | public void joinInput(final K id, final K joinNodeId, final int index) { |
212 | final JoinInput ji = SnapshotFactory.eINSTANCE.createJoinInput(); |
213 | ji.setId(String.valueOf(id)); |
214 | ji.setIndex(index); |
215 | objMap.put(id, ji); |
216 | snapshot.getElements().add(ji); |
217 | |
218 | refCommands.add(new Runnable() { |
219 | |
220 | @Override |
221 | public void run() { |
222 | JoinNode jn = (JoinNode) objMap.get(joinNodeId); |
223 | if (jn==null) { |
224 | throw new IllegalArgumentException("Join node with id "+joinNodeId+" not found."); |
225 | } |
226 | ji.setJoinNode(jn); |
227 | } |
228 | }); |
229 | |
230 | } |
231 | |
232 | @Override |
233 | public void joinNode( |
234 | final K id, |
235 | final Predicate<E, C> predicate, |
236 | final Set<Integer> outputIndices, |
237 | final K eventHandlerId, |
238 | final K nextJoinNodeId) { |
239 | |
240 | final JoinNode jn = SnapshotFactory.eINSTANCE.createJoinNode(); |
241 | snapshot.getElements().add(jn); |
242 | jn.setName(String.valueOf(outputIndices)); |
243 | objMap.put(id, jn); |
244 | jn.setId(String.valueOf(id)); |
245 | |
246 | if (predicate!=null) { |
247 | com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class); |
248 | sp.setId(String.valueOf(id)); |
249 | jn.setPredicate(sp); |
250 | } |
251 | |
252 | for (Integer oi: outputIndices) { |
253 | jn.getOutputIndices().add(oi); |
254 | } |
255 | |
256 | refCommands.add(new Runnable() { |
257 | |
258 | @Override |
259 | public void run() { |
260 | if (eventHandlerId!=null) { |
261 | if (nextJoinNodeId!=null) { |
262 | throw new IllegalArgumentException("eventHandlerId and nextJoinNodeId are mutually exclusive"); |
263 | } |
264 | Handler handler = (Handler) objMap.get(eventHandlerId); |
265 | if (handler==null) { |
266 | throw new IllegalArgumentException("Handler with id "+eventHandlerId+" not found."); |
267 | } |
268 | jn.setHandler(handler); |
269 | } else { |
270 | if (nextJoinNodeId==null) { |
271 | throw new IllegalArgumentException("Either eventHandlerId or nextJoinNodeId should not be null"); |
272 | } |
273 | JoinNode next = (JoinNode) objMap.get(nextJoinNodeId); |
274 | if (next==null) { |
275 | throw new IllegalArgumentException("Join node with id "+nextJoinNodeId+" not found."); |
276 | } |
277 | jn.setNext(next); |
278 | } |
279 | } |
280 | }); |
281 | |
282 | } |
283 | |
284 | @Override |
285 | public void joinInputCollector( |
286 | final K joinNodeId, |
287 | final int[] indices, |
288 | final Collection<K[]> elements) { |
289 | |
290 | JoinNode jn = (JoinNode) objMap.get(joinNodeId); |
291 | final JoinInputCollector jic = SnapshotFactory.eINSTANCE.createJoinInputCollector(); |
292 | jn.getCollectors().add(jic); |
293 | for (int idx: indices) { |
294 | jic.getIndices().add(idx); |
295 | } |
296 | |
297 | refCommands.add(new Runnable() { |
298 | |
299 | @Override |
300 | public void run() { |
301 | for (K[] e: elements) { |
302 | JoinEntry je = SnapshotFactory.eINSTANCE.createJoinEntry(); |
303 | for (K ee: e) { |
304 | Event ev = (Event) objMap.get(ee); |
305 | if (ev==null) { |
306 | throw new IllegalArgumentException("Event with id "+ee+" not found."); |
307 | } |
308 | je.getEvents().add(ev); |
309 | } |
310 | jic.getJoinEntries().add(je); |
311 | } |
312 | } |
313 | }); |
314 | |
315 | |
316 | } |
317 | |
318 | } |