1 /*******************************************************************************
2 * Copyright (c) 2012 Association for Decentralized Information Management
4 * All rights reserved. This program and the accompanying materials
5 * are made available under the terms of the Eclipse Public License v1.0
6 * which accompanies this distribution, and is available at
7 * http://www.eclipse.org/legal/epl-v10.html
10 * Semantum Oy - initial API and implementation
11 *******************************************************************************/
12 package org.simantics.event.writer;
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.Collection;
17 import java.util.List;
19 import java.util.concurrent.atomic.AtomicBoolean;
21 import org.eclipse.core.runtime.IProgressMonitor;
22 import org.eclipse.core.runtime.IStatus;
23 import org.eclipse.core.runtime.Status;
24 import org.simantics.DatabaseJob;
25 import org.simantics.Simantics;
26 import org.simantics.db.ReadGraph;
27 import org.simantics.db.Resource;
28 import org.simantics.db.Session;
29 import org.simantics.db.VirtualGraph;
30 import org.simantics.db.WriteGraph;
31 import org.simantics.db.common.primitiverequest.PossibleObject;
32 import org.simantics.db.common.request.ObjectsWithType;
33 import org.simantics.db.common.request.WriteRequest;
34 import org.simantics.db.common.utils.NameUtils;
35 import org.simantics.db.exception.CancelTransactionException;
36 import org.simantics.db.exception.DatabaseException;
37 import org.simantics.db.layer0.genericrelation.Dependencies;
38 import org.simantics.db.layer0.request.PossibleModel;
39 import org.simantics.event.Activator;
40 import org.simantics.event.ontology.EventResource;
41 import org.simantics.layer0.Layer0;
42 import org.simantics.operation.Layer0X;
43 import org.simantics.scl.runtime.function.Function;
44 import org.simantics.simulation.ontology.SimulationResource;
45 import org.simantics.utils.datastructures.MapList;
48 * A self-scheduling job that checks queued event resources and tries to resolve
49 * modules from the model based on the event source module name.
52 * The job will automatically stop self-scheduling when the experiment run
53 * referenced by the event producer becomes inactive.
56 * Must be disposed when no longer used.
58 * @author Tuukka Lehtonen
60 * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all
62 public class EventSourceResolver extends DatabaseJob {
65 * Externalizes the logic of checking whether a resource found from the
66 * Dependencies index can be an event source. The usual implementation will
67 * check whether the resource is an instance of a domain specific type.
70 public static interface Filter {
71 boolean accept(ReadGraph graph, Resource possibleSource)
72 throws DatabaseException;
75 private final boolean DEBUG = false;
76 private final boolean DEBUG_DETAIL = false;
78 private static final int RESOLUTION_SCHEDULE_PERIOD = 2000;
80 public List<Resource> events = new ArrayList<Resource>();
82 private boolean initialEventsResolved = false;
83 private Resource eventLog;
84 private Filter eventSourceFilter;
85 private VirtualGraph virtualGraph;
86 private AtomicBoolean polling = new AtomicBoolean(true);
88 public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) {
89 super("Event Source Resolver");
90 setPriority(DECORATE);
93 this.virtualGraph = virtualGraph;
94 this.eventLog = eventLog;
95 this.eventSourceFilter = eventSourceFilter;
98 public void dispose() {
100 System.out.println(this + ": DISPOSE");
105 public boolean shouldRun() {
106 return polling.get() && Simantics.peekSession() != null;
110 protected IStatus run(final IProgressMonitor monitor) {
112 Session session = Simantics.peekSession();
114 return Status.CANCEL_STATUS;
116 final Resource[] events = pruneEvents();
117 if (events.length == 0 && initialEventsResolved)
118 return Status.CANCEL_STATUS;
121 System.out.println(this + ": resolve " + events.length + " events");
122 if (!initialEventsResolved)
123 System.out.println(this + ": resolve initial events");
126 setThread(Thread.currentThread());
127 monitor.beginTask("Resolve " + events.length + " events", events.length);
129 session.sync(new WriteRequest(virtualGraph) {
131 @SuppressWarnings("rawtypes")
132 Function indexFunction;
138 public void perform(WriteGraph graph) throws DatabaseException {
139 setThread(Thread.currentThread());
141 L0 = Layer0.getInstance(graph);
142 L0X = Layer0X.getInstance(graph);
143 EVENT = EventResource.getInstance(graph);
145 Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer));
147 throw new CancelTransactionException();
148 if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive))
149 throw new CancelTransactionException();
151 model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf));
153 throw new CancelTransactionException();
155 indexFunction = graph.adapt(L0X.Dependencies, Function.class);
157 if (!initialEventsResolved) {
158 MapList<String, Resource> initialEventsBySource = new MapList<String,Resource>();
159 for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) {
160 Collection<Resource> initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event));
162 System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events");
163 MapList<String, Resource> bySource = sortEventsBySource(graph, initialEvents);
164 for(String key : bySource.getKeys()) {
165 initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key));
168 for (String sourceName : initialEventsBySource.getKeys())
169 resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName));
170 initialEventsResolved = true;
173 MapList<String, Resource> eventsBySource = sortEventsBySource(graph, Arrays.asList(events));
174 for (String sourceName : eventsBySource.getKeys()) {
175 resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName));
180 private MapList<String, Resource> sortEventsBySource(ReadGraph graph, Collection<Resource> events) throws DatabaseException {
181 MapList<String, Resource> result = new MapList<String, Resource>();
182 for (Resource event : events) {
183 Collection<Resource> sources = graph.getObjects(event, EVENT.Event_source);
184 if (!sources.isEmpty())
187 String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName);
188 if (sourceName != null && !sourceName.trim().isEmpty())
189 result.add(sourceName, event);
194 @SuppressWarnings({ "unchecked" })
195 private void resolveEvents(WriteGraph graph, String sourceName, Collection<Resource> eventsForSource) throws DatabaseException {
197 System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName);
198 if (sourceName == null)
199 throw new NullPointerException("null source name");
201 monitor.subTask("Resolve events with source " + sourceName);
203 System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName);
204 List<Map<String, Object>> results = (List<Map<String, Object>>) indexFunction.apply(graph, model, "Name:" + sourceName);
205 for (Map<String, Object> result : results) {
206 Resource r = (Resource) result.get(Dependencies.FIELD_RESOURCE);
207 if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r))
209 Resource rModel = graph.sync(new PossibleModel(r));
210 if (model.equals(rModel)) {
212 System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true));
214 for (Resource event : eventsForSource) {
215 Resource otherEvent = resolveAndMarkEventPair(graph, r, event);
217 if (otherEvent == null) {
218 // No match for this event, add also inverse of Event_source relation.
219 graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r);
221 // Found the event that matches this one, remove Event_source_inverse to
222 // prevent bloating the event source resource with unnecessary statements.
223 graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r);
224 graph.claim(otherEvent, EVENT.Event_source, null, r);
225 graph.claim(event, EVENT.Event_source, null, r);
232 private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException {
233 Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index);
234 boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent);
236 for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) {
237 Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index);
238 boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent);
239 if (index != null && index.equals(eventIndex) && returnEvent != ret) {
242 System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent);
243 graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent);
246 System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent);
247 graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event);
256 } catch (CancelTransactionException e) {
258 return Status.CANCEL_STATUS;
259 } catch (DatabaseException e) {
260 return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e);
263 return Status.OK_STATUS;
266 schedule(RESOLUTION_SCHEDULE_PERIOD);
270 private Resource[] pruneEvents() {
271 synchronized ( this.events ) {
272 Resource[] events = this.events.toArray(Resource.NONE);
278 public void queueEvents(Collection<Resource> events) {
279 synchronized (this.events) {
280 this.events.addAll(events);
284 public void queueEvent(Resource event) {
285 synchronized (this.events) {
286 this.events.add(event);