X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.event%2Fsrc%2Forg%2Fsimantics%2Fevent%2Fwriter%2FEventSourceResolver.java;fp=bundles%2Forg.simantics.event%2Fsrc%2Forg%2Fsimantics%2Fevent%2Fwriter%2FEventSourceResolver.java;h=96375d0a3773c5c9186a2f38ba5d336cad6e927d;hb=969bd23cab98a79ca9101af33334000879fb60c5;hp=0000000000000000000000000000000000000000;hpb=866dba5cd5a3929bbeae85991796acb212338a08;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java b/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java new file mode 100644 index 000000000..96375d0a3 --- /dev/null +++ b/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java @@ -0,0 +1,290 @@ +/******************************************************************************* + * Copyright (c) 2012 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Semantum Oy - initial API and implementation + *******************************************************************************/ +package org.simantics.event.writer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.simantics.DatabaseJob; +import org.simantics.Simantics; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.VirtualGraph; +import org.simantics.db.WriteGraph; +import org.simantics.db.common.primitiverequest.PossibleObject; +import org.simantics.db.common.request.ObjectsWithType; +import org.simantics.db.common.request.WriteRequest; +import org.simantics.db.common.utils.NameUtils; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.layer0.genericrelation.Dependencies; +import org.simantics.db.layer0.request.PossibleModel; +import org.simantics.event.Activator; +import org.simantics.event.ontology.EventResource; +import org.simantics.layer0.Layer0; +import org.simantics.operation.Layer0X; +import org.simantics.scl.runtime.function.Function; +import org.simantics.simulation.ontology.SimulationResource; +import org.simantics.utils.datastructures.MapList; + +/** + * A self-scheduling job that checks queued event resources and tries to resolve + * modules from the model based on the event source module name. + * + *

+ * The job will automatically stop self-scheduling when the experiment run + * referenced by the event producer becomes inactive. + * + *

+ * Must be disposed when no longer used. + * + * @author Tuukka Lehtonen + * + * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all + */ +public class EventSourceResolver extends DatabaseJob { + + /** + * Externalizes the logic of checking whether a resource found from the + * Dependencies index can be an event source. The usual implementation will + * check whether the resource is an instance of a domain specific type. + * + */ + public static interface Filter { + boolean accept(ReadGraph graph, Resource possibleSource) + throws DatabaseException; + } + + private final boolean DEBUG = false; + private final boolean DEBUG_DETAIL = false; + + private static final int RESOLUTION_SCHEDULE_PERIOD = 2000; + + public List events = new ArrayList(); + + private boolean initialEventsResolved = false; + private Resource eventLog; + private Filter eventSourceFilter; + private VirtualGraph virtualGraph; + private AtomicBoolean polling = new AtomicBoolean(true); + + public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) { + super("Event Source Resolver"); + setPriority(DECORATE); + setUser(false); + setSystem(true); + this.virtualGraph = virtualGraph; + this.eventLog = eventLog; + this.eventSourceFilter = eventSourceFilter; + } + + public void dispose() { + if (DEBUG) + System.out.println(this + ": DISPOSE"); + polling.set(false); + } + + @Override + public boolean shouldRun() { + return polling.get() && Simantics.peekSession() != null; + } + + @Override + protected IStatus run(final IProgressMonitor monitor) { + try { + Session session = Simantics.peekSession(); + if (session == null) + return Status.CANCEL_STATUS; + + final Resource[] events = pruneEvents(); + if (events.length == 0 && initialEventsResolved) + return Status.CANCEL_STATUS; + + if (DEBUG) { + System.out.println(this + ": resolve " + events.length + " events"); + if (!initialEventsResolved) + System.out.println(this + ": resolve initial events"); + } + + setThread(Thread.currentThread()); + monitor.beginTask("Resolve " + events.length + " events", events.length); + try { + session.sync(new WriteRequest(virtualGraph) { + Resource model; + @SuppressWarnings("rawtypes") + Function indexFunction; + Layer0 L0; + Layer0X L0X; + EventResource EVENT; + + @Override + public void perform(WriteGraph graph) throws DatabaseException { + setThread(Thread.currentThread()); + + L0 = Layer0.getInstance(graph); + L0X = Layer0X.getInstance(graph); + EVENT = EventResource.getInstance(graph); + + Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer)); + if (run == null) + throw new CancelTransactionException(); + if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive)) + throw new CancelTransactionException(); + + model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf)); + if (model == null) + throw new CancelTransactionException(); + + indexFunction = graph.adapt(L0X.Dependencies, Function.class); + + if (!initialEventsResolved) { + MapList initialEventsBySource = new MapList(); + for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) { + Collection initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event)); + if (DEBUG) + System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events"); + MapList bySource = sortEventsBySource(graph, initialEvents); + for(String key : bySource.getKeys()) { + initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key)); + } + } + for (String sourceName : initialEventsBySource.getKeys()) + resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName)); + initialEventsResolved = true; + } + + MapList eventsBySource = sortEventsBySource(graph, Arrays.asList(events)); + for (String sourceName : eventsBySource.getKeys()) { + resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName)); + monitor.worked(1); + } + } + + private MapList sortEventsBySource(ReadGraph graph, Collection events) throws DatabaseException { + MapList result = new MapList(); + for (Resource event : events) { + Collection sources = graph.getObjects(event, EVENT.Event_source); + if (!sources.isEmpty()) + continue; + + String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName); + if (sourceName != null && !sourceName.trim().isEmpty()) + result.add(sourceName, event); + } + return result; + } + + @SuppressWarnings({ "unchecked" }) + private void resolveEvents(WriteGraph graph, String sourceName, Collection eventsForSource) throws DatabaseException { + if (DEBUG) + System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName); + if (sourceName == null) + throw new NullPointerException("null source name"); + + monitor.subTask("Resolve events with source " + sourceName); + if (DEBUG) + System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName); + List> results = (List>) indexFunction.apply(graph, model, "Name:" + sourceName); + for (Map result : results) { + Resource r = (Resource) result.get(Dependencies.FIELD_RESOURCE); + if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r)) + continue; + Resource rModel = graph.sync(new PossibleModel(r)); + if (model.equals(rModel)) { + if (DEBUG) + System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true)); + + for (Resource event : eventsForSource) { + Resource otherEvent = resolveAndMarkEventPair(graph, r, event); + + if (otherEvent == null) { + // No match for this event, add also inverse of Event_source relation. + graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r); + } else { + // Found the event that matches this one, remove Event_source_inverse to + // prevent bloating the event source resource with unnecessary statements. + graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r); + graph.claim(otherEvent, EVENT.Event_source, null, r); + graph.claim(event, EVENT.Event_source, null, r); + } + } + } + } + } + + private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException { + Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index); + boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent); + + for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) { + Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index); + boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent); + if (index != null && index.equals(eventIndex) && returnEvent != ret) { + if (returnEvent) { + if (DEBUG_DETAIL) + System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent); + graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent); + } else { + if (DEBUG_DETAIL) + System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent); + graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event); + } + return otherEvent; + } + } + + return null; + } + }); + } catch (CancelTransactionException e) { + polling.set(false); + return Status.CANCEL_STATUS; + } catch (DatabaseException e) { + return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e); + } + + return Status.OK_STATUS; + } finally { + monitor.done(); + schedule(RESOLUTION_SCHEDULE_PERIOD); + } + } + + private Resource[] pruneEvents() { + synchronized ( this.events ) { + Resource[] events = this.events.toArray(Resource.NONE); + this.events.clear(); + return events; + } + } + + public void queueEvents(Collection events) { + synchronized (this.events) { + this.events.addAll(events); + } + } + + public void queueEvent(Resource event) { + synchronized (this.events) { + this.events.add(event); + } + } + +}