| 1 | package com.hammurapi.eventbus.snapshot.io; |
| 2 | |
| 3 | import java.io.File; |
| 4 | import java.io.IOException; |
| 5 | import java.lang.management.ManagementFactory; |
| 6 | import java.util.Collection; |
| 7 | import java.util.Date; |
| 8 | import java.util.HashMap; |
| 9 | import java.util.LinkedList; |
| 10 | import java.util.List; |
| 11 | import java.util.Map; |
| 12 | import java.util.Set; |
| 13 | |
| 14 | import org.eclipse.emf.common.util.EList; |
| 15 | import org.eclipse.emf.common.util.URI; |
| 16 | import org.eclipse.emf.ecore.EObject; |
| 17 | import org.eclipse.emf.ecore.resource.Resource; |
| 18 | import org.eclipse.emf.ecore.resource.ResourceSet; |
| 19 | import org.eclipse.emf.ecore.resource.impl.ResourceSetImpl; |
| 20 | import org.eclipse.emf.ecore.xmi.impl.XMIResourceFactoryImpl; |
| 21 | |
| 22 | import com.hammurapi.convert.ConvertingService; |
| 23 | import com.hammurapi.eventbus.EventBus; |
| 24 | import com.hammurapi.eventbus.EventBusException; |
| 25 | import com.hammurapi.eventbus.EventHandler; |
| 26 | import com.hammurapi.eventbus.EventStore; |
| 27 | import com.hammurapi.eventbus.snapshot.Derivation; |
| 28 | import com.hammurapi.eventbus.snapshot.Event; |
| 29 | import com.hammurapi.eventbus.snapshot.Handler; |
| 30 | import com.hammurapi.eventbus.snapshot.JoinEntry; |
| 31 | import com.hammurapi.eventbus.snapshot.JoinInput; |
| 32 | import com.hammurapi.eventbus.snapshot.JoinInputCollector; |
| 33 | import com.hammurapi.eventbus.snapshot.JoinNode; |
| 34 | import com.hammurapi.eventbus.snapshot.PredicateNode; |
| 35 | import com.hammurapi.eventbus.snapshot.PredicateNodeOutput; |
| 36 | import com.hammurapi.eventbus.snapshot.SnapshotFactory; |
| 37 | import com.hammurapi.extract.Predicate; |
| 38 | |
| 39 | public class SnapshotOutput<E, P extends Comparable<P>, C, K, H extends EventBus.Handle<E,P,C>, S extends EventStore<E,P,C,H,S>> implements com.hammurapi.eventbus.AbstractEventBus.StateSnapshot<E, P, C, K, H, S> { |
| 40 | |
| 41 | com.hammurapi.eventbus.snapshot.Snapshot snapshot; |
| 42 | private File out; |
| 43 | private Map<K, Object> objMap = new HashMap<K, Object>() { |
| 44 | public Object put(K key, Object value) { |
| 45 | Object oldVal = super.put(key, value); |
| 46 | if (oldVal!=null) { |
| 47 | throw new IllegalArgumentException("Duplicate key: "+key+". Old value: "+oldVal+", new value "+value); |
| 48 | } |
| 49 | return oldVal; |
| 50 | } |
| 51 | }; |
| 52 | |
| 53 | private List<Runnable> refCommands = new LinkedList<Runnable>(); |
| 54 | |
| 55 | /** |
| 56 | * Takes snapshot, doesn't write it to a file. |
| 57 | */ |
| 58 | public SnapshotOutput() { |
| 59 | |
| 60 | } |
| 61 | |
| 62 | public SnapshotOutput(File out) { |
| 63 | this.out = out; |
| 64 | } |
| 65 | |
| 66 | public com.hammurapi.eventbus.snapshot.Snapshot getSnapshot() { |
| 67 | return snapshot; |
| 68 | } |
| 69 | |
| 70 | @Override |
| 71 | public void start() { |
| 72 | snapshot = SnapshotFactory.eINSTANCE.createSnapshot(); |
| 73 | snapshot.setTimestamp(new Date()); |
| 74 | snapshot.setJvmId(ManagementFactory.getRuntimeMXBean().getName()); |
| 75 | objMap.clear(); |
| 76 | refCommands.clear(); |
| 77 | } |
| 78 | |
| 79 | @Override |
| 80 | public void end(boolean success) { |
| 81 | if (success && out!=null) { |
| 82 | for (Runnable c: refCommands) { |
| 83 | c.run(); |
| 84 | } |
| 85 | ResourceSet resourceSet = new ResourceSetImpl(); |
| 86 | // Register the appropriate resource factory to handle all file extensions. |
| 87 | // |
| 88 | resourceSet.getResourceFactoryRegistry().getExtensionToFactoryMap().put(Resource.Factory.Registry.DEFAULT_EXTENSION, new XMIResourceFactoryImpl()); |
| 89 | |
| 90 | URI uri = URI.createFileURI(out.getAbsolutePath()); |
| 91 | Resource configResource = resourceSet.createResource(uri); |
| 92 | EList<EObject> contents = configResource.getContents(); |
| 93 | contents.add(snapshot); |
| 94 | try { |
| 95 | configResource.save(null); |
| 96 | } catch (IOException e) { |
| 97 | throw new EventBusException("Could not save snapshot: "+e, e); |
| 98 | } |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | @Override |
| 103 | public void handler(K id, EventHandler<E, P, C, H, S> eventHandler) { |
| 104 | Handler handler = ConvertingService.convert(eventHandler, Handler.class); |
| 105 | snapshot.getElements().add(handler); |
| 106 | handler.setId(String.valueOf(id)); |
| 107 | objMap.put(id, handler); |
| 108 | } |
| 109 | |
| 110 | @Override |
| 111 | public void event(K id, E event, boolean directPost) { |
| 112 | Event se = ConvertingService.convert(event, Event.class); |
| 113 | se.setDirectPost(directPost); |
| 114 | snapshot.getElements().add(se); |
| 115 | se.setId(String.valueOf(id)); |
| 116 | objMap.put(id, se); |
| 117 | } |
| 118 | |
| 119 | |
| 120 | |
| 121 | @Override |
| 122 | public void derivation(final K eventId, final K handlerId, final List<K> inputs) { |
| 123 | refCommands.add(new Runnable() { |
| 124 | |
| 125 | @Override |
| 126 | public void run() { |
| 127 | Derivation der = SnapshotFactory.eINSTANCE.createDerivation(); |
| 128 | Event event = (Event) objMap.get(eventId); |
| 129 | if (event==null) { |
| 130 | throw new IllegalArgumentException("Event with id "+eventId+" not found."); |
| 131 | } |
| 132 | event.getDerivations().add(der); |
| 133 | Handler handler = (Handler) objMap.get(handlerId); |
| 134 | if (handler==null) { |
| 135 | throw new IllegalArgumentException("Handler with id "+eventId+" not found."); |
| 136 | } |
| 137 | der.setHandler(handler); |
| 138 | |
| 139 | for (K inputId: inputs) { |
| 140 | Event input = (Event) objMap.get(inputId); |
| 141 | if (input==null) { |
| 142 | throw new IllegalArgumentException("Event with id "+inputId+" not found."); |
| 143 | } |
| 144 | der.getInputs().add(input); |
| 145 | } |
| 146 | } |
| 147 | }); |
| 148 | |
| 149 | } |
| 150 | |
| 151 | @Override |
| 152 | public void predicateNode( |
| 153 | final K id, |
| 154 | final Predicate<E, C> predicate, |
| 155 | final Collection<K> trueChildren, |
| 156 | final Collection<K> trueHandlers, |
| 157 | final Collection<K> falseChildren, |
| 158 | final Collection<K> falseHandlers, |
| 159 | final boolean isRoot) { |
| 160 | |
| 161 | final PredicateNode pn = SnapshotFactory.eINSTANCE.createPredicateNode(); |
| 162 | snapshot.getElements().add(pn); |
| 163 | pn.setId(String.valueOf(id)); |
| 164 | com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class); |
| 165 | sp.setId(String.valueOf(id)); |
| 166 | pn.setPredicate(sp); |
| 167 | pn.setName(sp.getName()); |
| 168 | pn.setDetails(sp.getDetails()); |
| 169 | pn.setIsRoot(isRoot); |
| 170 | |
| 171 | objMap.put(id, pn); |
| 172 | |
| 173 | refCommands.add(new Runnable() { |
| 174 | |
| 175 | @Override |
| 176 | public void run() { |
| 177 | for (K tcId: trueChildren) { |
| 178 | PredicateNode tc = (PredicateNode) objMap.get(tcId); |
| 179 | if (tc==null) { |
| 180 | throw new IllegalArgumentException("Predicate node with id "+tcId+" not found."); |
| 181 | } |
| 182 | pn.getTrueChildren().add(tc); |
| 183 | } |
| 184 | for (K fcId: falseChildren) { |
| 185 | PredicateNode fc = (PredicateNode) objMap.get(fcId); |
| 186 | if (fc==null) { |
| 187 | throw new IllegalArgumentException("Predicate node with id "+fcId+" not found."); |
| 188 | } |
| 189 | pn.getFalseChildren().add(fc); |
| 190 | } |
| 191 | for (K thId: trueHandlers) { |
| 192 | PredicateNodeOutput th = (PredicateNodeOutput) objMap.get(thId); |
| 193 | if (th==null) { |
| 194 | throw new IllegalArgumentException("Predicate node output with id "+thId+" not found."); |
| 195 | } |
| 196 | pn.getTrueOutputs().add(th); |
| 197 | } |
| 198 | for (K fhId: falseHandlers) { |
| 199 | PredicateNodeOutput fh = (PredicateNodeOutput) objMap.get(fhId); |
| 200 | if (fh==null) { |
| 201 | throw new IllegalArgumentException("Predicate node output with id "+fhId+" not found."); |
| 202 | } |
| 203 | pn.getFalseOutputs().add(fh); |
| 204 | } |
| 205 | } |
| 206 | }); |
| 207 | |
| 208 | } |
| 209 | |
| 210 | @Override |
| 211 | public void joinInput(final K id, final K joinNodeId, final int index) { |
| 212 | final JoinInput ji = SnapshotFactory.eINSTANCE.createJoinInput(); |
| 213 | ji.setId(String.valueOf(id)); |
| 214 | ji.setIndex(index); |
| 215 | objMap.put(id, ji); |
| 216 | snapshot.getElements().add(ji); |
| 217 | |
| 218 | refCommands.add(new Runnable() { |
| 219 | |
| 220 | @Override |
| 221 | public void run() { |
| 222 | JoinNode jn = (JoinNode) objMap.get(joinNodeId); |
| 223 | if (jn==null) { |
| 224 | throw new IllegalArgumentException("Join node with id "+joinNodeId+" not found."); |
| 225 | } |
| 226 | ji.setJoinNode(jn); |
| 227 | } |
| 228 | }); |
| 229 | |
| 230 | } |
| 231 | |
| 232 | @Override |
| 233 | public void joinNode( |
| 234 | final K id, |
| 235 | final Predicate<E, C> predicate, |
| 236 | final Set<Integer> outputIndices, |
| 237 | final K eventHandlerId, |
| 238 | final K nextJoinNodeId) { |
| 239 | |
| 240 | final JoinNode jn = SnapshotFactory.eINSTANCE.createJoinNode(); |
| 241 | snapshot.getElements().add(jn); |
| 242 | jn.setName(String.valueOf(outputIndices)); |
| 243 | objMap.put(id, jn); |
| 244 | jn.setId(String.valueOf(id)); |
| 245 | |
| 246 | if (predicate!=null) { |
| 247 | com.hammurapi.eventbus.snapshot.Predicate sp = ConvertingService.convert(predicate, com.hammurapi.eventbus.snapshot.Predicate.class); |
| 248 | sp.setId(String.valueOf(id)); |
| 249 | jn.setPredicate(sp); |
| 250 | } |
| 251 | |
| 252 | for (Integer oi: outputIndices) { |
| 253 | jn.getOutputIndices().add(oi); |
| 254 | } |
| 255 | |
| 256 | refCommands.add(new Runnable() { |
| 257 | |
| 258 | @Override |
| 259 | public void run() { |
| 260 | if (eventHandlerId!=null) { |
| 261 | if (nextJoinNodeId!=null) { |
| 262 | throw new IllegalArgumentException("eventHandlerId and nextJoinNodeId are mutually exclusive"); |
| 263 | } |
| 264 | Handler handler = (Handler) objMap.get(eventHandlerId); |
| 265 | if (handler==null) { |
| 266 | throw new IllegalArgumentException("Handler with id "+eventHandlerId+" not found."); |
| 267 | } |
| 268 | jn.setHandler(handler); |
| 269 | } else { |
| 270 | if (nextJoinNodeId==null) { |
| 271 | throw new IllegalArgumentException("Either eventHandlerId or nextJoinNodeId should not be null"); |
| 272 | } |
| 273 | JoinNode next = (JoinNode) objMap.get(nextJoinNodeId); |
| 274 | if (next==null) { |
| 275 | throw new IllegalArgumentException("Join node with id "+nextJoinNodeId+" not found."); |
| 276 | } |
| 277 | jn.setNext(next); |
| 278 | } |
| 279 | } |
| 280 | }); |
| 281 | |
| 282 | } |
| 283 | |
| 284 | @Override |
| 285 | public void joinInputCollector( |
| 286 | final K joinNodeId, |
| 287 | final int[] indices, |
| 288 | final Collection<K[]> elements) { |
| 289 | |
| 290 | JoinNode jn = (JoinNode) objMap.get(joinNodeId); |
| 291 | final JoinInputCollector jic = SnapshotFactory.eINSTANCE.createJoinInputCollector(); |
| 292 | jn.getCollectors().add(jic); |
| 293 | for (int idx: indices) { |
| 294 | jic.getIndices().add(idx); |
| 295 | } |
| 296 | |
| 297 | refCommands.add(new Runnable() { |
| 298 | |
| 299 | @Override |
| 300 | public void run() { |
| 301 | for (K[] e: elements) { |
| 302 | JoinEntry je = SnapshotFactory.eINSTANCE.createJoinEntry(); |
| 303 | for (K ee: e) { |
| 304 | Event ev = (Event) objMap.get(ee); |
| 305 | if (ev==null) { |
| 306 | throw new IllegalArgumentException("Event with id "+ee+" not found."); |
| 307 | } |
| 308 | je.getEvents().add(ev); |
| 309 | } |
| 310 | jic.getJoinEntries().add(je); |
| 311 | } |
| 312 | } |
| 313 | }); |
| 314 | |
| 315 | |
| 316 | } |
| 317 | |
| 318 | } |