/******************************************************************************* * Copyright (c) 2012 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Semantum Oy - initial API and implementation *******************************************************************************/ package org.simantics.event.writer; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; import org.simantics.DatabaseJob; import org.simantics.Simantics; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteGraph; import org.simantics.db.common.primitiverequest.PossibleObject; import org.simantics.db.common.request.ObjectsWithType; import org.simantics.db.common.request.WriteRequest; import org.simantics.db.common.utils.NameUtils; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.layer0.genericrelation.Dependencies; import org.simantics.db.layer0.genericrelation.IndexQueries; import org.simantics.db.layer0.request.PossibleModel; import org.simantics.event.Activator; import org.simantics.event.ontology.EventResource; import org.simantics.layer0.Layer0; import org.simantics.operation.Layer0X; import org.simantics.scl.runtime.function.Function; import org.simantics.simulation.ontology.SimulationResource; import org.simantics.utils.datastructures.MapList; /** * A self-scheduling job that checks queued event resources and tries to resolve * modules from the model based on the event source module name. * *

* The job will automatically stop self-scheduling when the experiment run * referenced by the event producer becomes inactive. * *

* Must be disposed when no longer used. * * @author Tuukka Lehtonen * * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all */ public class EventSourceResolver extends DatabaseJob { /** * Externalizes the logic of checking whether a resource found from the * Dependencies index can be an event source. The usual implementation will * check whether the resource is an instance of a domain specific type. * */ public static interface Filter { boolean accept(ReadGraph graph, Resource possibleSource) throws DatabaseException; } private final boolean DEBUG = false; private final boolean DEBUG_DETAIL = false; private static final int RESOLUTION_SCHEDULE_PERIOD = 2000; public List events = new ArrayList(); private boolean initialEventsResolved = false; private Resource eventLog; private Filter eventSourceFilter; private VirtualGraph virtualGraph; private AtomicBoolean polling = new AtomicBoolean(true); public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) { super("Event Source Resolver"); setPriority(DECORATE); setUser(false); setSystem(true); this.virtualGraph = virtualGraph; this.eventLog = eventLog; this.eventSourceFilter = eventSourceFilter; } public void dispose() { if (DEBUG) System.out.println(this + ": DISPOSE"); polling.set(false); } @Override public boolean shouldRun() { return polling.get() && Simantics.peekSession() != null; } @Override protected IStatus run(final IProgressMonitor monitor) { try { Session session = Simantics.peekSession(); if (session == null) return Status.CANCEL_STATUS; final Resource[] events = pruneEvents(); if (events.length == 0 && initialEventsResolved) return Status.CANCEL_STATUS; if (DEBUG) { System.out.println(this + ": resolve " + events.length + " events"); if (!initialEventsResolved) System.out.println(this + ": resolve initial events"); } setThread(Thread.currentThread()); monitor.beginTask("Resolve " + events.length + " events", events.length); try { session.sync(new WriteRequest(virtualGraph) { Resource model; @SuppressWarnings("rawtypes") Function indexFunction; Layer0 L0; Layer0X L0X; EventResource EVENT; @Override public void perform(WriteGraph graph) throws DatabaseException { setThread(Thread.currentThread()); L0 = Layer0.getInstance(graph); L0X = Layer0X.getInstance(graph); EVENT = EventResource.getInstance(graph); Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer)); if (run == null) throw new CancelTransactionException(); if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive)) throw new CancelTransactionException(); model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf)); if (model == null) throw new CancelTransactionException(); indexFunction = graph.adapt(L0X.DependencyResources, Function.class); if (!initialEventsResolved) { MapList initialEventsBySource = new MapList(); for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) { Collection initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event)); if (DEBUG) System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events"); MapList bySource = sortEventsBySource(graph, initialEvents); for(String key : bySource.getKeys()) { initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key)); } } for (String sourceName : initialEventsBySource.getKeys()) resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName)); initialEventsResolved = true; } MapList eventsBySource = sortEventsBySource(graph, Arrays.asList(events)); for (String sourceName : eventsBySource.getKeys()) { resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName)); monitor.worked(1); } } private MapList sortEventsBySource(ReadGraph graph, Collection events) throws DatabaseException { MapList result = new MapList(); for (Resource event : events) { Collection sources = graph.getObjects(event, EVENT.Event_source); if (!sources.isEmpty()) continue; String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName); if (sourceName != null && !sourceName.trim().isEmpty()) result.add(sourceName, event); } return result; } @SuppressWarnings({ "unchecked" }) private void resolveEvents(WriteGraph graph, String sourceName, Collection eventsForSource) throws DatabaseException { if (DEBUG) System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName); if (sourceName == null) throw new NullPointerException("null source name"); monitor.subTask("Resolve events with source " + sourceName); if (DEBUG) System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName); List results = (List) indexFunction.apply(graph, model, IndexQueries.quoteTerm(Dependencies.FIELD_NAME, sourceName)); for (Resource r : results) { if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r)) continue; Resource rModel = graph.sync(new PossibleModel(r)); if (model.equals(rModel)) { if (DEBUG) System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true)); for (Resource event : eventsForSource) { Resource otherEvent = resolveAndMarkEventPair(graph, r, event); if (otherEvent == null) { // No match for this event, add also inverse of Event_source relation. graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r); } else { // Found the event that matches this one, remove Event_source_inverse to // prevent bloating the event source resource with unnecessary statements. graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r); graph.claim(otherEvent, EVENT.Event_source, null, r); graph.claim(event, EVENT.Event_source, null, r); } } } } } private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException { Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index); boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent); for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) { Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index); boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent); if (index != null && index.equals(eventIndex) && returnEvent != ret) { if (returnEvent) { if (DEBUG_DETAIL) System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent); graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent); } else { if (DEBUG_DETAIL) System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent); graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event); } return otherEvent; } } return null; } }); } catch (CancelTransactionException e) { polling.set(false); return Status.CANCEL_STATUS; } catch (DatabaseException e) { return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e); } return Status.OK_STATUS; } finally { monitor.done(); schedule(RESOLUTION_SCHEDULE_PERIOD); } } private Resource[] pruneEvents() { synchronized ( this.events ) { Resource[] events = this.events.toArray(Resource.NONE); this.events.clear(); return events; } } public void queueEvents(Collection events) { synchronized (this.events) { this.events.addAll(events); } } public void queueEvent(Resource event) { synchronized (this.events) { this.events.add(event); } } }