X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.event%2Fsrc%2Forg%2Fsimantics%2Fevent%2Fwriter%2FEventSourceResolver.java;fp=bundles%2Forg.simantics.event%2Fsrc%2Forg%2Fsimantics%2Fevent%2Fwriter%2FEventSourceResolver.java;h=96375d0a3773c5c9186a2f38ba5d336cad6e927d;hb=969bd23cab98a79ca9101af33334000879fb60c5;hp=0000000000000000000000000000000000000000;hpb=866dba5cd5a3929bbeae85991796acb212338a08;p=simantics%2Fplatform.git
diff --git a/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java b/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java
new file mode 100644
index 000000000..96375d0a3
--- /dev/null
+++ b/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java
@@ -0,0 +1,290 @@
+/*******************************************************************************
+ * Copyright (c) 2012 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ * Semantum Oy - initial API and implementation
+ *******************************************************************************/
+package org.simantics.event.writer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+import org.simantics.DatabaseJob;
+import org.simantics.Simantics;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.WriteGraph;
+import org.simantics.db.common.primitiverequest.PossibleObject;
+import org.simantics.db.common.request.ObjectsWithType;
+import org.simantics.db.common.request.WriteRequest;
+import org.simantics.db.common.utils.NameUtils;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.layer0.genericrelation.Dependencies;
+import org.simantics.db.layer0.request.PossibleModel;
+import org.simantics.event.Activator;
+import org.simantics.event.ontology.EventResource;
+import org.simantics.layer0.Layer0;
+import org.simantics.operation.Layer0X;
+import org.simantics.scl.runtime.function.Function;
+import org.simantics.simulation.ontology.SimulationResource;
+import org.simantics.utils.datastructures.MapList;
+
+/**
+ * A self-scheduling job that checks queued event resources and tries to resolve
+ * modules from the model based on the event source module name.
+ *
+ *
+ * The job will automatically stop self-scheduling when the experiment run
+ * referenced by the event producer becomes inactive.
+ *
+ *
+ * Must be disposed when no longer used.
+ *
+ * @author Tuukka Lehtonen
+ *
+ * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all
+ */
+public class EventSourceResolver extends DatabaseJob {
+
+ /**
+ * Externalizes the logic of checking whether a resource found from the
+ * Dependencies index can be an event source. The usual implementation will
+ * check whether the resource is an instance of a domain specific type.
+ *
+ */
+ public static interface Filter {
+ boolean accept(ReadGraph graph, Resource possibleSource)
+ throws DatabaseException;
+ }
+
+ private final boolean DEBUG = false;
+ private final boolean DEBUG_DETAIL = false;
+
+ private static final int RESOLUTION_SCHEDULE_PERIOD = 2000;
+
+ public List events = new ArrayList();
+
+ private boolean initialEventsResolved = false;
+ private Resource eventLog;
+ private Filter eventSourceFilter;
+ private VirtualGraph virtualGraph;
+ private AtomicBoolean polling = new AtomicBoolean(true);
+
+ public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) {
+ super("Event Source Resolver");
+ setPriority(DECORATE);
+ setUser(false);
+ setSystem(true);
+ this.virtualGraph = virtualGraph;
+ this.eventLog = eventLog;
+ this.eventSourceFilter = eventSourceFilter;
+ }
+
+ public void dispose() {
+ if (DEBUG)
+ System.out.println(this + ": DISPOSE");
+ polling.set(false);
+ }
+
+ @Override
+ public boolean shouldRun() {
+ return polling.get() && Simantics.peekSession() != null;
+ }
+
+ @Override
+ protected IStatus run(final IProgressMonitor monitor) {
+ try {
+ Session session = Simantics.peekSession();
+ if (session == null)
+ return Status.CANCEL_STATUS;
+
+ final Resource[] events = pruneEvents();
+ if (events.length == 0 && initialEventsResolved)
+ return Status.CANCEL_STATUS;
+
+ if (DEBUG) {
+ System.out.println(this + ": resolve " + events.length + " events");
+ if (!initialEventsResolved)
+ System.out.println(this + ": resolve initial events");
+ }
+
+ setThread(Thread.currentThread());
+ monitor.beginTask("Resolve " + events.length + " events", events.length);
+ try {
+ session.sync(new WriteRequest(virtualGraph) {
+ Resource model;
+ @SuppressWarnings("rawtypes")
+ Function indexFunction;
+ Layer0 L0;
+ Layer0X L0X;
+ EventResource EVENT;
+
+ @Override
+ public void perform(WriteGraph graph) throws DatabaseException {
+ setThread(Thread.currentThread());
+
+ L0 = Layer0.getInstance(graph);
+ L0X = Layer0X.getInstance(graph);
+ EVENT = EventResource.getInstance(graph);
+
+ Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer));
+ if (run == null)
+ throw new CancelTransactionException();
+ if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive))
+ throw new CancelTransactionException();
+
+ model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf));
+ if (model == null)
+ throw new CancelTransactionException();
+
+ indexFunction = graph.adapt(L0X.Dependencies, Function.class);
+
+ if (!initialEventsResolved) {
+ MapList initialEventsBySource = new MapList();
+ for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) {
+ Collection initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event));
+ if (DEBUG)
+ System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events");
+ MapList bySource = sortEventsBySource(graph, initialEvents);
+ for(String key : bySource.getKeys()) {
+ initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key));
+ }
+ }
+ for (String sourceName : initialEventsBySource.getKeys())
+ resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName));
+ initialEventsResolved = true;
+ }
+
+ MapList eventsBySource = sortEventsBySource(graph, Arrays.asList(events));
+ for (String sourceName : eventsBySource.getKeys()) {
+ resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName));
+ monitor.worked(1);
+ }
+ }
+
+ private MapList sortEventsBySource(ReadGraph graph, Collection events) throws DatabaseException {
+ MapList result = new MapList();
+ for (Resource event : events) {
+ Collection sources = graph.getObjects(event, EVENT.Event_source);
+ if (!sources.isEmpty())
+ continue;
+
+ String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName);
+ if (sourceName != null && !sourceName.trim().isEmpty())
+ result.add(sourceName, event);
+ }
+ return result;
+ }
+
+ @SuppressWarnings({ "unchecked" })
+ private void resolveEvents(WriteGraph graph, String sourceName, Collection eventsForSource) throws DatabaseException {
+ if (DEBUG)
+ System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName);
+ if (sourceName == null)
+ throw new NullPointerException("null source name");
+
+ monitor.subTask("Resolve events with source " + sourceName);
+ if (DEBUG)
+ System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName);
+ List