+/*******************************************************************************\r
+ * Copyright (c) 2012 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ * Semantum Oy - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.event.writer;\r
+\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.concurrent.atomic.AtomicBoolean;\r
+\r
+import org.eclipse.core.runtime.IProgressMonitor;\r
+import org.eclipse.core.runtime.IStatus;\r
+import org.eclipse.core.runtime.Status;\r
+import org.simantics.DatabaseJob;\r
+import org.simantics.Simantics;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.Session;\r
+import org.simantics.db.VirtualGraph;\r
+import org.simantics.db.WriteGraph;\r
+import org.simantics.db.common.primitiverequest.PossibleObject;\r
+import org.simantics.db.common.request.ObjectsWithType;\r
+import org.simantics.db.common.request.WriteRequest;\r
+import org.simantics.db.common.utils.NameUtils;\r
+import org.simantics.db.exception.CancelTransactionException;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.layer0.genericrelation.Dependencies;\r
+import org.simantics.db.layer0.request.PossibleModel;\r
+import org.simantics.event.Activator;\r
+import org.simantics.event.ontology.EventResource;\r
+import org.simantics.layer0.Layer0;\r
+import org.simantics.operation.Layer0X;\r
+import org.simantics.scl.runtime.function.Function;\r
+import org.simantics.simulation.ontology.SimulationResource;\r
+import org.simantics.utils.datastructures.MapList;\r
+\r
+/**\r
+ * A self-scheduling job that checks queued event resources and tries to resolve\r
+ * modules from the model based on the event source module name.\r
+ * \r
+ * <p>\r
+ * The job will automatically stop self-scheduling when the experiment run\r
+ * referenced by the event producer becomes inactive.\r
+ * \r
+ * <p>\r
+ * Must be disposed when no longer used.\r
+ * \r
+ * @author Tuukka Lehtonen\r
+ * \r
+ * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all \r
+ */\r
+public class EventSourceResolver extends DatabaseJob {\r
+\r
+ /**\r
+ * Externalizes the logic of checking whether a resource found from the\r
+ * Dependencies index can be an event source. The usual implementation will\r
+ * check whether the resource is an instance of a domain specific type.\r
+ * \r
+ */\r
+ public static interface Filter {\r
+ boolean accept(ReadGraph graph, Resource possibleSource)\r
+ throws DatabaseException;\r
+ }\r
+\r
+ private final boolean DEBUG = false;\r
+ private final boolean DEBUG_DETAIL = false;\r
+\r
+ private static final int RESOLUTION_SCHEDULE_PERIOD = 2000;\r
+\r
+ public List<Resource> events = new ArrayList<Resource>();\r
+\r
+ private boolean initialEventsResolved = false;\r
+ private Resource eventLog;\r
+ private Filter eventSourceFilter;\r
+ private VirtualGraph virtualGraph;\r
+ private AtomicBoolean polling = new AtomicBoolean(true);\r
+\r
+ public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) {\r
+ super("Event Source Resolver");\r
+ setPriority(DECORATE);\r
+ setUser(false);\r
+ setSystem(true);\r
+ this.virtualGraph = virtualGraph;\r
+ this.eventLog = eventLog;\r
+ this.eventSourceFilter = eventSourceFilter;\r
+ }\r
+\r
+ public void dispose() {\r
+ if (DEBUG)\r
+ System.out.println(this + ": DISPOSE");\r
+ polling.set(false);\r
+ }\r
+\r
+ @Override\r
+ public boolean shouldRun() {\r
+ return polling.get() && Simantics.peekSession() != null;\r
+ }\r
+\r
+ @Override\r
+ protected IStatus run(final IProgressMonitor monitor) {\r
+ try {\r
+ Session session = Simantics.peekSession();\r
+ if (session == null)\r
+ return Status.CANCEL_STATUS;\r
+\r
+ final Resource[] events = pruneEvents();\r
+ if (events.length == 0 && initialEventsResolved)\r
+ return Status.CANCEL_STATUS;\r
+\r
+ if (DEBUG) {\r
+ System.out.println(this + ": resolve " + events.length + " events");\r
+ if (!initialEventsResolved)\r
+ System.out.println(this + ": resolve initial events");\r
+ }\r
+\r
+ setThread(Thread.currentThread());\r
+ monitor.beginTask("Resolve " + events.length + " events", events.length);\r
+ try {\r
+ session.sync(new WriteRequest(virtualGraph) {\r
+ Resource model;\r
+ @SuppressWarnings("rawtypes")\r
+ Function indexFunction;\r
+ Layer0 L0;\r
+ Layer0X L0X;\r
+ EventResource EVENT;\r
+\r
+ @Override\r
+ public void perform(WriteGraph graph) throws DatabaseException {\r
+ setThread(Thread.currentThread());\r
+\r
+ L0 = Layer0.getInstance(graph);\r
+ L0X = Layer0X.getInstance(graph);\r
+ EVENT = EventResource.getInstance(graph);\r
+\r
+ Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer));\r
+ if (run == null)\r
+ throw new CancelTransactionException();\r
+ if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive))\r
+ throw new CancelTransactionException();\r
+\r
+ model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf));\r
+ if (model == null)\r
+ throw new CancelTransactionException();\r
+\r
+ indexFunction = graph.adapt(L0X.Dependencies, Function.class);\r
+\r
+ if (!initialEventsResolved) {\r
+ MapList<String, Resource> initialEventsBySource = new MapList<String,Resource>();\r
+ for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) {\r
+ Collection<Resource> initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event));\r
+ if (DEBUG)\r
+ System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events");\r
+ MapList<String, Resource> bySource = sortEventsBySource(graph, initialEvents);\r
+ for(String key : bySource.getKeys()) {\r
+ initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key));\r
+ }\r
+ }\r
+ for (String sourceName : initialEventsBySource.getKeys())\r
+ resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName));\r
+ initialEventsResolved = true;\r
+ }\r
+\r
+ MapList<String, Resource> eventsBySource = sortEventsBySource(graph, Arrays.asList(events));\r
+ for (String sourceName : eventsBySource.getKeys()) {\r
+ resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName));\r
+ monitor.worked(1);\r
+ }\r
+ }\r
+\r
+ private MapList<String, Resource> sortEventsBySource(ReadGraph graph, Collection<Resource> events) throws DatabaseException {\r
+ MapList<String, Resource> result = new MapList<String, Resource>();\r
+ for (Resource event : events) {\r
+ Collection<Resource> sources = graph.getObjects(event, EVENT.Event_source);\r
+ if (!sources.isEmpty())\r
+ continue;\r
+\r
+ String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName);\r
+ if (sourceName != null && !sourceName.trim().isEmpty())\r
+ result.add(sourceName, event); \r
+ }\r
+ return result;\r
+ }\r
+\r
+ @SuppressWarnings({ "unchecked" })\r
+ private void resolveEvents(WriteGraph graph, String sourceName, Collection<Resource> eventsForSource) throws DatabaseException {\r
+ if (DEBUG)\r
+ System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName);\r
+ if (sourceName == null)\r
+ throw new NullPointerException("null source name");\r
+\r
+ monitor.subTask("Resolve events with source " + sourceName);\r
+ if (DEBUG)\r
+ System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName);\r
+ List<Map<String, Object>> results = (List<Map<String, Object>>) indexFunction.apply(graph, model, "Name:" + sourceName);\r
+ for (Map<String, Object> result : results) {\r
+ Resource r = (Resource) result.get(Dependencies.FIELD_RESOURCE);\r
+ if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r))\r
+ continue;\r
+ Resource rModel = graph.sync(new PossibleModel(r));\r
+ if (model.equals(rModel)) {\r
+ if (DEBUG)\r
+ System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true));\r
+\r
+ for (Resource event : eventsForSource) {\r
+ Resource otherEvent = resolveAndMarkEventPair(graph, r, event);\r
+\r
+ if (otherEvent == null) {\r
+ // No match for this event, add also inverse of Event_source relation.\r
+ graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r);\r
+ } else {\r
+ // Found the event that matches this one, remove Event_source_inverse to\r
+ // prevent bloating the event source resource with unnecessary statements.\r
+ graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r);\r
+ graph.claim(otherEvent, EVENT.Event_source, null, r);\r
+ graph.claim(event, EVENT.Event_source, null, r);\r
+ }\r
+ }\r
+ }\r
+ }\r
+ }\r
+\r
+ private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException {\r
+ Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index);\r
+ boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent);\r
+\r
+ for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) {\r
+ Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index);\r
+ boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent);\r
+ if (index != null && index.equals(eventIndex) && returnEvent != ret) {\r
+ if (returnEvent) {\r
+ if (DEBUG_DETAIL)\r
+ System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent);\r
+ graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent);\r
+ } else {\r
+ if (DEBUG_DETAIL)\r
+ System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent);\r
+ graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event);\r
+ }\r
+ return otherEvent;\r
+ }\r
+ }\r
+\r
+ return null;\r
+ }\r
+ });\r
+ } catch (CancelTransactionException e) {\r
+ polling.set(false);\r
+ return Status.CANCEL_STATUS;\r
+ } catch (DatabaseException e) {\r
+ return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e);\r
+ }\r
+\r
+ return Status.OK_STATUS;\r
+ } finally {\r
+ monitor.done();\r
+ schedule(RESOLUTION_SCHEDULE_PERIOD);\r
+ }\r
+ }\r
+\r
+ private Resource[] pruneEvents() {\r
+ synchronized ( this.events ) {\r
+ Resource[] events = this.events.toArray(Resource.NONE);\r
+ this.events.clear();\r
+ return events;\r
+ }\r
+ }\r
+\r
+ public void queueEvents(Collection<Resource> events) {\r
+ synchronized (this.events) {\r
+ this.events.addAll(events);\r
+ }\r
+ }\r
+\r
+ public void queueEvent(Resource event) {\r
+ synchronized (this.events) {\r
+ this.events.add(event);\r
+ }\r
+ }\r
+\r
+}\r