]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.event / src / org / simantics / event / writer / EventSourceResolver.java
diff --git a/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java b/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java
new file mode 100644 (file)
index 0000000..96375d0
--- /dev/null
@@ -0,0 +1,290 @@
+/*******************************************************************************\r
+ * Copyright (c) 2012 Association for Decentralized Information Management\r
+ * in Industry THTH ry.\r
+ * All rights reserved. This program and the accompanying materials\r
+ * are made available under the terms of the Eclipse Public License v1.0\r
+ * which accompanies this distribution, and is available at\r
+ * http://www.eclipse.org/legal/epl-v10.html\r
+ *\r
+ * Contributors:\r
+ *     Semantum Oy - initial API and implementation\r
+ *******************************************************************************/\r
+package org.simantics.event.writer;\r
+\r
+import java.util.ArrayList;\r
+import java.util.Arrays;\r
+import java.util.Collection;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.concurrent.atomic.AtomicBoolean;\r
+\r
+import org.eclipse.core.runtime.IProgressMonitor;\r
+import org.eclipse.core.runtime.IStatus;\r
+import org.eclipse.core.runtime.Status;\r
+import org.simantics.DatabaseJob;\r
+import org.simantics.Simantics;\r
+import org.simantics.db.ReadGraph;\r
+import org.simantics.db.Resource;\r
+import org.simantics.db.Session;\r
+import org.simantics.db.VirtualGraph;\r
+import org.simantics.db.WriteGraph;\r
+import org.simantics.db.common.primitiverequest.PossibleObject;\r
+import org.simantics.db.common.request.ObjectsWithType;\r
+import org.simantics.db.common.request.WriteRequest;\r
+import org.simantics.db.common.utils.NameUtils;\r
+import org.simantics.db.exception.CancelTransactionException;\r
+import org.simantics.db.exception.DatabaseException;\r
+import org.simantics.db.layer0.genericrelation.Dependencies;\r
+import org.simantics.db.layer0.request.PossibleModel;\r
+import org.simantics.event.Activator;\r
+import org.simantics.event.ontology.EventResource;\r
+import org.simantics.layer0.Layer0;\r
+import org.simantics.operation.Layer0X;\r
+import org.simantics.scl.runtime.function.Function;\r
+import org.simantics.simulation.ontology.SimulationResource;\r
+import org.simantics.utils.datastructures.MapList;\r
+\r
+/**\r
+ * A self-scheduling job that checks queued event resources and tries to resolve\r
+ * modules from the model based on the event source module name.\r
+ * \r
+ * <p>\r
+ * The job will automatically stop self-scheduling when the experiment run\r
+ * referenced by the event producer becomes inactive.\r
+ * \r
+ * <p>\r
+ * Must be disposed when no longer used.\r
+ * \r
+ * @author Tuukka Lehtonen\r
+ * \r
+ * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all \r
+ */\r
+public class EventSourceResolver extends DatabaseJob {\r
+\r
+    /**\r
+     * Externalizes the logic of checking whether a resource found from the\r
+     * Dependencies index can be an event source. The usual implementation will\r
+     * check whether the resource is an instance of a domain specific type.\r
+     * \r
+     */\r
+    public static interface Filter {\r
+        boolean accept(ReadGraph graph, Resource possibleSource)\r
+                throws DatabaseException;\r
+    }\r
+\r
+    private final boolean    DEBUG                      = false;\r
+    private final boolean    DEBUG_DETAIL               = false;\r
+\r
+    private static final int RESOLUTION_SCHEDULE_PERIOD = 2000;\r
+\r
+    public List<Resource>    events                     = new ArrayList<Resource>();\r
+\r
+    private boolean          initialEventsResolved      = false;\r
+    private Resource         eventLog;\r
+    private Filter           eventSourceFilter;\r
+    private VirtualGraph     virtualGraph;\r
+    private AtomicBoolean    polling                    = new AtomicBoolean(true);\r
+\r
+    public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) {\r
+        super("Event Source Resolver");\r
+        setPriority(DECORATE);\r
+        setUser(false);\r
+        setSystem(true);\r
+        this.virtualGraph = virtualGraph;\r
+        this.eventLog = eventLog;\r
+        this.eventSourceFilter = eventSourceFilter;\r
+    }\r
+\r
+    public void dispose() {\r
+        if (DEBUG)\r
+            System.out.println(this + ": DISPOSE");\r
+        polling.set(false);\r
+    }\r
+\r
+    @Override\r
+    public boolean shouldRun() {\r
+        return polling.get() && Simantics.peekSession() != null;\r
+    }\r
+\r
+    @Override\r
+    protected IStatus run(final IProgressMonitor monitor) {\r
+        try {\r
+            Session session = Simantics.peekSession();\r
+            if (session == null)\r
+                return Status.CANCEL_STATUS;\r
+\r
+            final Resource[] events = pruneEvents();\r
+            if (events.length == 0 && initialEventsResolved)\r
+                return Status.CANCEL_STATUS;\r
+\r
+            if (DEBUG) {\r
+                System.out.println(this + ": resolve " + events.length + " events");\r
+                if (!initialEventsResolved)\r
+                    System.out.println(this + ": resolve initial events");\r
+            }\r
+\r
+            setThread(Thread.currentThread());\r
+            monitor.beginTask("Resolve " + events.length + " events", events.length);\r
+            try {\r
+                session.sync(new WriteRequest(virtualGraph) {\r
+                    Resource model;\r
+                    @SuppressWarnings("rawtypes")\r
+                    Function indexFunction;\r
+                    Layer0 L0;\r
+                    Layer0X L0X;\r
+                    EventResource EVENT;\r
+\r
+                    @Override\r
+                    public void perform(WriteGraph graph) throws DatabaseException {\r
+                        setThread(Thread.currentThread());\r
+\r
+                        L0 = Layer0.getInstance(graph);\r
+                        L0X = Layer0X.getInstance(graph);\r
+                        EVENT = EventResource.getInstance(graph);\r
+\r
+                        Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer));\r
+                        if (run == null)\r
+                            throw new CancelTransactionException();\r
+                        if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive))\r
+                            throw new CancelTransactionException();\r
+\r
+                        model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf));\r
+                        if (model == null)\r
+                            throw new CancelTransactionException();\r
+\r
+                        indexFunction = graph.adapt(L0X.Dependencies, Function.class);\r
+\r
+                        if (!initialEventsResolved) {\r
+                            MapList<String, Resource> initialEventsBySource = new MapList<String,Resource>();\r
+                               for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) {\r
+                                Collection<Resource> initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event));\r
+                                if (DEBUG)\r
+                                    System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events");\r
+                                MapList<String, Resource> bySource = sortEventsBySource(graph, initialEvents);\r
+                                for(String key : bySource.getKeys()) {\r
+                                       initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key));\r
+                                }\r
+                               }\r
+                               for (String sourceName : initialEventsBySource.getKeys())\r
+                                       resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName));\r
+                            initialEventsResolved = true;\r
+                        }\r
+\r
+                        MapList<String, Resource> eventsBySource = sortEventsBySource(graph, Arrays.asList(events));\r
+                        for (String sourceName : eventsBySource.getKeys()) {\r
+                            resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName));\r
+                            monitor.worked(1);\r
+                        }\r
+                    }\r
+\r
+                    private MapList<String, Resource> sortEventsBySource(ReadGraph graph, Collection<Resource> events) throws DatabaseException {\r
+                        MapList<String, Resource> result = new MapList<String, Resource>();\r
+                        for (Resource event : events) {\r
+                            Collection<Resource> sources = graph.getObjects(event, EVENT.Event_source);\r
+                            if (!sources.isEmpty())\r
+                                continue;\r
+\r
+                            String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName);\r
+                            if (sourceName != null && !sourceName.trim().isEmpty())\r
+                                result.add(sourceName, event); \r
+                        }\r
+                        return result;\r
+                    }\r
+\r
+                    @SuppressWarnings({ "unchecked" })\r
+                    private void resolveEvents(WriteGraph graph, String sourceName, Collection<Resource> eventsForSource) throws DatabaseException {\r
+                        if (DEBUG)\r
+                            System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName);\r
+                        if (sourceName == null)\r
+                            throw new NullPointerException("null source name");\r
+\r
+                        monitor.subTask("Resolve events with source " + sourceName);\r
+                        if (DEBUG)\r
+                            System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName);\r
+                        List<Map<String, Object>> results = (List<Map<String, Object>>) indexFunction.apply(graph, model, "Name:" + sourceName);\r
+                        for (Map<String, Object> result : results) {\r
+                            Resource r = (Resource) result.get(Dependencies.FIELD_RESOURCE);\r
+                            if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r))\r
+                                continue;\r
+                            Resource rModel = graph.sync(new PossibleModel(r));\r
+                            if (model.equals(rModel)) {\r
+                                if (DEBUG)\r
+                                    System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true));\r
+\r
+                                for (Resource event : eventsForSource) {\r
+                                    Resource otherEvent = resolveAndMarkEventPair(graph, r, event);\r
+\r
+                                    if (otherEvent == null) {\r
+                                        // No match for this event, add also inverse of Event_source relation.\r
+                                        graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r);\r
+                                    } else {\r
+                                        // Found the event that matches this one, remove Event_source_inverse to\r
+                                        // prevent bloating the event source resource with unnecessary statements.\r
+                                        graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r);\r
+                                        graph.claim(otherEvent, EVENT.Event_source, null, r);\r
+                                        graph.claim(event, EVENT.Event_source, null, r);\r
+                                    }\r
+                                }\r
+                            }\r
+                        }\r
+                    }\r
+\r
+                    private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException {\r
+                        Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index);\r
+                        boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent);\r
+\r
+                        for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) {\r
+                            Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index);\r
+                            boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent);\r
+                            if (index != null && index.equals(eventIndex) && returnEvent != ret) {\r
+                                if (returnEvent) {\r
+                                    if (DEBUG_DETAIL)\r
+                                        System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent);\r
+                                    graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent);\r
+                                } else {\r
+                                    if (DEBUG_DETAIL)\r
+                                        System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent);\r
+                                    graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event);\r
+                                }\r
+                                return otherEvent;\r
+                            }\r
+                        }\r
+\r
+                        return null;\r
+                    }\r
+                });\r
+            } catch (CancelTransactionException e) {\r
+                polling.set(false);\r
+                return Status.CANCEL_STATUS;\r
+            } catch (DatabaseException e) {\r
+                return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e);\r
+            }\r
+\r
+            return Status.OK_STATUS;\r
+        } finally {\r
+            monitor.done();\r
+            schedule(RESOLUTION_SCHEDULE_PERIOD);\r
+        }\r
+    }\r
+\r
+    private Resource[] pruneEvents() {\r
+        synchronized ( this.events ) {\r
+            Resource[] events = this.events.toArray(Resource.NONE);\r
+            this.events.clear();\r
+            return events;\r
+        }\r
+    }\r
+\r
+    public void queueEvents(Collection<Resource> events) {\r
+        synchronized (this.events) {\r
+            this.events.addAll(events);\r
+        }\r
+    }\r
+\r
+    public void queueEvent(Resource event) {\r
+        synchronized (this.events) {\r
+            this.events.add(event);\r
+        }\r
+    }\r
+\r
+}\r