X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.event%2Fsrc%2Forg%2Fsimantics%2Fevent%2Fwriter%2FEventSourceResolver.java;h=9e2b6667744e409f2a9e5c8f3df9e95598d37d4c;hb=HEAD;hp=96375d0a3773c5c9186a2f38ba5d336cad6e927d;hpb=969bd23cab98a79ca9101af33334000879fb60c5;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java b/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java index 96375d0a3..9e2b66677 100644 --- a/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java +++ b/bundles/org.simantics.event/src/org/simantics/event/writer/EventSourceResolver.java @@ -1,290 +1,290 @@ -/******************************************************************************* - * Copyright (c) 2012 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Semantum Oy - initial API and implementation - *******************************************************************************/ -package org.simantics.event.writer; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.core.runtime.IStatus; -import org.eclipse.core.runtime.Status; -import org.simantics.DatabaseJob; -import org.simantics.Simantics; -import org.simantics.db.ReadGraph; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.VirtualGraph; -import org.simantics.db.WriteGraph; -import org.simantics.db.common.primitiverequest.PossibleObject; -import org.simantics.db.common.request.ObjectsWithType; -import org.simantics.db.common.request.WriteRequest; -import org.simantics.db.common.utils.NameUtils; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.layer0.genericrelation.Dependencies; -import org.simantics.db.layer0.request.PossibleModel; -import org.simantics.event.Activator; -import org.simantics.event.ontology.EventResource; -import org.simantics.layer0.Layer0; -import org.simantics.operation.Layer0X; -import org.simantics.scl.runtime.function.Function; -import org.simantics.simulation.ontology.SimulationResource; -import org.simantics.utils.datastructures.MapList; - -/** - * A self-scheduling job that checks queued event resources and tries to resolve - * modules from the model based on the event source module name. - * - *

- * The job will automatically stop self-scheduling when the experiment run - * referenced by the event producer becomes inactive. - * - *

- * Must be disposed when no longer used. - * - * @author Tuukka Lehtonen - * - * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all - */ -public class EventSourceResolver extends DatabaseJob { - - /** - * Externalizes the logic of checking whether a resource found from the - * Dependencies index can be an event source. The usual implementation will - * check whether the resource is an instance of a domain specific type. - * - */ - public static interface Filter { - boolean accept(ReadGraph graph, Resource possibleSource) - throws DatabaseException; - } - - private final boolean DEBUG = false; - private final boolean DEBUG_DETAIL = false; - - private static final int RESOLUTION_SCHEDULE_PERIOD = 2000; - - public List events = new ArrayList(); - - private boolean initialEventsResolved = false; - private Resource eventLog; - private Filter eventSourceFilter; - private VirtualGraph virtualGraph; - private AtomicBoolean polling = new AtomicBoolean(true); - - public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) { - super("Event Source Resolver"); - setPriority(DECORATE); - setUser(false); - setSystem(true); - this.virtualGraph = virtualGraph; - this.eventLog = eventLog; - this.eventSourceFilter = eventSourceFilter; - } - - public void dispose() { - if (DEBUG) - System.out.println(this + ": DISPOSE"); - polling.set(false); - } - - @Override - public boolean shouldRun() { - return polling.get() && Simantics.peekSession() != null; - } - - @Override - protected IStatus run(final IProgressMonitor monitor) { - try { - Session session = Simantics.peekSession(); - if (session == null) - return Status.CANCEL_STATUS; - - final Resource[] events = pruneEvents(); - if (events.length == 0 && initialEventsResolved) - return Status.CANCEL_STATUS; - - if (DEBUG) { - System.out.println(this + ": resolve " + events.length + " events"); - if (!initialEventsResolved) - System.out.println(this + ": resolve initial events"); - } - - setThread(Thread.currentThread()); - monitor.beginTask("Resolve " + events.length + " events", events.length); - try { - session.sync(new WriteRequest(virtualGraph) { - Resource model; - @SuppressWarnings("rawtypes") - Function indexFunction; - Layer0 L0; - Layer0X L0X; - EventResource EVENT; - - @Override - public void perform(WriteGraph graph) throws DatabaseException { - setThread(Thread.currentThread()); - - L0 = Layer0.getInstance(graph); - L0X = Layer0X.getInstance(graph); - EVENT = EventResource.getInstance(graph); - - Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer)); - if (run == null) - throw new CancelTransactionException(); - if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive)) - throw new CancelTransactionException(); - - model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf)); - if (model == null) - throw new CancelTransactionException(); - - indexFunction = graph.adapt(L0X.Dependencies, Function.class); - - if (!initialEventsResolved) { - MapList initialEventsBySource = new MapList(); - for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) { - Collection initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event)); - if (DEBUG) - System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events"); - MapList bySource = sortEventsBySource(graph, initialEvents); - for(String key : bySource.getKeys()) { - initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key)); - } - } - for (String sourceName : initialEventsBySource.getKeys()) - resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName)); - initialEventsResolved = true; - } - - MapList eventsBySource = sortEventsBySource(graph, Arrays.asList(events)); - for (String sourceName : eventsBySource.getKeys()) { - resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName)); - monitor.worked(1); - } - } - - private MapList sortEventsBySource(ReadGraph graph, Collection events) throws DatabaseException { - MapList result = new MapList(); - for (Resource event : events) { - Collection sources = graph.getObjects(event, EVENT.Event_source); - if (!sources.isEmpty()) - continue; - - String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName); - if (sourceName != null && !sourceName.trim().isEmpty()) - result.add(sourceName, event); - } - return result; - } - - @SuppressWarnings({ "unchecked" }) - private void resolveEvents(WriteGraph graph, String sourceName, Collection eventsForSource) throws DatabaseException { - if (DEBUG) - System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName); - if (sourceName == null) - throw new NullPointerException("null source name"); - - monitor.subTask("Resolve events with source " + sourceName); - if (DEBUG) - System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName); - List> results = (List>) indexFunction.apply(graph, model, "Name:" + sourceName); - for (Map result : results) { - Resource r = (Resource) result.get(Dependencies.FIELD_RESOURCE); - if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r)) - continue; - Resource rModel = graph.sync(new PossibleModel(r)); - if (model.equals(rModel)) { - if (DEBUG) - System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true)); - - for (Resource event : eventsForSource) { - Resource otherEvent = resolveAndMarkEventPair(graph, r, event); - - if (otherEvent == null) { - // No match for this event, add also inverse of Event_source relation. - graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r); - } else { - // Found the event that matches this one, remove Event_source_inverse to - // prevent bloating the event source resource with unnecessary statements. - graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r); - graph.claim(otherEvent, EVENT.Event_source, null, r); - graph.claim(event, EVENT.Event_source, null, r); - } - } - } - } - } - - private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException { - Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index); - boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent); - - for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) { - Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index); - boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent); - if (index != null && index.equals(eventIndex) && returnEvent != ret) { - if (returnEvent) { - if (DEBUG_DETAIL) - System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent); - graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent); - } else { - if (DEBUG_DETAIL) - System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent); - graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event); - } - return otherEvent; - } - } - - return null; - } - }); - } catch (CancelTransactionException e) { - polling.set(false); - return Status.CANCEL_STATUS; - } catch (DatabaseException e) { - return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e); - } - - return Status.OK_STATUS; - } finally { - monitor.done(); - schedule(RESOLUTION_SCHEDULE_PERIOD); - } - } - - private Resource[] pruneEvents() { - synchronized ( this.events ) { - Resource[] events = this.events.toArray(Resource.NONE); - this.events.clear(); - return events; - } - } - - public void queueEvents(Collection events) { - synchronized (this.events) { - this.events.addAll(events); - } - } - - public void queueEvent(Resource event) { - synchronized (this.events) { - this.events.add(event); - } - } - -} +/******************************************************************************* + * Copyright (c) 2012 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Semantum Oy - initial API and implementation + *******************************************************************************/ +package org.simantics.event.writer; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.simantics.DatabaseJob; +import org.simantics.Simantics; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.VirtualGraph; +import org.simantics.db.WriteGraph; +import org.simantics.db.common.primitiverequest.PossibleObject; +import org.simantics.db.common.request.ObjectsWithType; +import org.simantics.db.common.request.WriteRequest; +import org.simantics.db.common.utils.NameUtils; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.layer0.genericrelation.Dependencies; +import org.simantics.db.layer0.genericrelation.IndexQueries; +import org.simantics.db.layer0.request.PossibleModel; +import org.simantics.event.Activator; +import org.simantics.event.ontology.EventResource; +import org.simantics.layer0.Layer0; +import org.simantics.operation.Layer0X; +import org.simantics.scl.runtime.function.Function; +import org.simantics.simulation.ontology.SimulationResource; +import org.simantics.utils.datastructures.MapList; + +/** + * A self-scheduling job that checks queued event resources and tries to resolve + * modules from the model based on the event source module name. + * + *

+ * The job will automatically stop self-scheduling when the experiment run + * referenced by the event producer becomes inactive. + * + *

+ * Must be disposed when no longer used. + * + * @author Tuukka Lehtonen + * + * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all + */ +public class EventSourceResolver extends DatabaseJob { + + /** + * Externalizes the logic of checking whether a resource found from the + * Dependencies index can be an event source. The usual implementation will + * check whether the resource is an instance of a domain specific type. + * + */ + public static interface Filter { + boolean accept(ReadGraph graph, Resource possibleSource) + throws DatabaseException; + } + + private final boolean DEBUG = false; + private final boolean DEBUG_DETAIL = false; + + private static final int RESOLUTION_SCHEDULE_PERIOD = 2000; + + public List events = new ArrayList(); + + private boolean initialEventsResolved = false; + private Resource eventLog; + private Filter eventSourceFilter; + private VirtualGraph virtualGraph; + private AtomicBoolean polling = new AtomicBoolean(true); + + public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) { + super("Event Source Resolver"); + setPriority(DECORATE); + setUser(false); + setSystem(true); + this.virtualGraph = virtualGraph; + this.eventLog = eventLog; + this.eventSourceFilter = eventSourceFilter; + } + + public void dispose() { + if (DEBUG) + System.out.println(this + ": DISPOSE"); + polling.set(false); + } + + @Override + public boolean shouldRun() { + return polling.get() && Simantics.peekSession() != null; + } + + @Override + protected IStatus run(final IProgressMonitor monitor) { + try { + Session session = Simantics.peekSession(); + if (session == null) + return Status.CANCEL_STATUS; + + final Resource[] events = pruneEvents(); + if (events.length == 0 && initialEventsResolved) + return Status.CANCEL_STATUS; + + if (DEBUG) { + System.out.println(this + ": resolve " + events.length + " events"); + if (!initialEventsResolved) + System.out.println(this + ": resolve initial events"); + } + + setThread(Thread.currentThread()); + monitor.beginTask("Resolve " + events.length + " events", events.length); + try { + session.sync(new WriteRequest(virtualGraph) { + Resource model; + @SuppressWarnings("rawtypes") + Function indexFunction; + Layer0 L0; + Layer0X L0X; + EventResource EVENT; + + @Override + public void perform(WriteGraph graph) throws DatabaseException { + setThread(Thread.currentThread()); + + L0 = Layer0.getInstance(graph); + L0X = Layer0X.getInstance(graph); + EVENT = EventResource.getInstance(graph); + + Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer)); + if (run == null) + throw new CancelTransactionException(); + if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive)) + throw new CancelTransactionException(); + + model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf)); + if (model == null) + throw new CancelTransactionException(); + + indexFunction = graph.adapt(L0X.DependencyResources, Function.class); + + if (!initialEventsResolved) { + MapList initialEventsBySource = new MapList(); + for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) { + Collection initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event)); + if (DEBUG) + System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events"); + MapList bySource = sortEventsBySource(graph, initialEvents); + for(String key : bySource.getKeys()) { + initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key)); + } + } + for (String sourceName : initialEventsBySource.getKeys()) + resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName)); + initialEventsResolved = true; + } + + MapList eventsBySource = sortEventsBySource(graph, Arrays.asList(events)); + for (String sourceName : eventsBySource.getKeys()) { + resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName)); + monitor.worked(1); + } + } + + private MapList sortEventsBySource(ReadGraph graph, Collection events) throws DatabaseException { + MapList result = new MapList(); + for (Resource event : events) { + Collection sources = graph.getObjects(event, EVENT.Event_source); + if (!sources.isEmpty()) + continue; + + String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName); + if (sourceName != null && !sourceName.trim().isEmpty()) + result.add(sourceName, event); + } + return result; + } + + @SuppressWarnings({ "unchecked" }) + private void resolveEvents(WriteGraph graph, String sourceName, Collection eventsForSource) throws DatabaseException { + if (DEBUG) + System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName); + if (sourceName == null) + throw new NullPointerException("null source name"); + + monitor.subTask("Resolve events with source " + sourceName); + if (DEBUG) + System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName); + List results = (List) indexFunction.apply(graph, model, + IndexQueries.quoteTerm(Dependencies.FIELD_NAME, sourceName)); + for (Resource r : results) { + if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r)) + continue; + Resource rModel = graph.sync(new PossibleModel(r)); + if (model.equals(rModel)) { + if (DEBUG) + System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true)); + + for (Resource event : eventsForSource) { + Resource otherEvent = resolveAndMarkEventPair(graph, r, event); + + if (otherEvent == null) { + // No match for this event, add also inverse of Event_source relation. + graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r); + } else { + // Found the event that matches this one, remove Event_source_inverse to + // prevent bloating the event source resource with unnecessary statements. + graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r); + graph.claim(otherEvent, EVENT.Event_source, null, r); + graph.claim(event, EVENT.Event_source, null, r); + } + } + } + } + } + + private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException { + Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index); + boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent); + + for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) { + Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index); + boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent); + if (index != null && index.equals(eventIndex) && returnEvent != ret) { + if (returnEvent) { + if (DEBUG_DETAIL) + System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent); + graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent); + } else { + if (DEBUG_DETAIL) + System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent); + graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event); + } + return otherEvent; + } + } + + return null; + } + }); + } catch (CancelTransactionException e) { + polling.set(false); + return Status.CANCEL_STATUS; + } catch (DatabaseException e) { + return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e); + } + + return Status.OK_STATUS; + } finally { + monitor.done(); + schedule(RESOLUTION_SCHEDULE_PERIOD); + } + } + + private Resource[] pruneEvents() { + synchronized ( this.events ) { + Resource[] events = this.events.toArray(Resource.NONE); + this.events.clear(); + return events; + } + } + + public void queueEvents(Collection events) { + synchronized (this.events) { + this.events.addAll(events); + } + } + + public void queueEvent(Resource event) { + synchronized (this.events) { + this.events.add(event); + } + } + +}