Index query fixes after commit 5e340942
[simantics/platform.git] / bundles / org.simantics.event / src / org / simantics / event / writer / EventSourceResolver.java
1 /*******************************************************************************
2  * Copyright (c) 2012 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     Semantum Oy - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.event.writer;
13
14 import java.util.ArrayList;
15 import java.util.Arrays;
16 import java.util.Collection;
17 import java.util.List;
18 import java.util.concurrent.atomic.AtomicBoolean;
19
20 import org.eclipse.core.runtime.IProgressMonitor;
21 import org.eclipse.core.runtime.IStatus;
22 import org.eclipse.core.runtime.Status;
23 import org.simantics.DatabaseJob;
24 import org.simantics.Simantics;
25 import org.simantics.db.ReadGraph;
26 import org.simantics.db.Resource;
27 import org.simantics.db.Session;
28 import org.simantics.db.VirtualGraph;
29 import org.simantics.db.WriteGraph;
30 import org.simantics.db.common.primitiverequest.PossibleObject;
31 import org.simantics.db.common.request.ObjectsWithType;
32 import org.simantics.db.common.request.WriteRequest;
33 import org.simantics.db.common.utils.NameUtils;
34 import org.simantics.db.exception.CancelTransactionException;
35 import org.simantics.db.exception.DatabaseException;
36 import org.simantics.db.layer0.genericrelation.Dependencies;
37 import org.simantics.db.layer0.genericrelation.IndexQueries;
38 import org.simantics.db.layer0.request.PossibleModel;
39 import org.simantics.event.Activator;
40 import org.simantics.event.ontology.EventResource;
41 import org.simantics.layer0.Layer0;
42 import org.simantics.operation.Layer0X;
43 import org.simantics.scl.runtime.function.Function;
44 import org.simantics.simulation.ontology.SimulationResource;
45 import org.simantics.utils.datastructures.MapList;
46
47 /**
48  * A self-scheduling job that checks queued event resources and tries to resolve
49  * modules from the model based on the event source module name.
50  * 
51  * <p>
52  * The job will automatically stop self-scheduling when the experiment run
53  * referenced by the event producer becomes inactive.
54  * 
55  * <p>
56  * Must be disposed when no longer used.
57  * 
58  * @author Tuukka Lehtonen
59  * 
60  * TODO: optimize return event resolution somehow. Currently forced to perform a linear search of all 
61  */
62 public class EventSourceResolver extends DatabaseJob {
63
64     /**
65      * Externalizes the logic of checking whether a resource found from the
66      * Dependencies index can be an event source. The usual implementation will
67      * check whether the resource is an instance of a domain specific type.
68      * 
69      */
70     public static interface Filter {
71         boolean accept(ReadGraph graph, Resource possibleSource)
72                 throws DatabaseException;
73     }
74
75     private final boolean    DEBUG                      = false;
76     private final boolean    DEBUG_DETAIL               = false;
77
78     private static final int RESOLUTION_SCHEDULE_PERIOD = 2000;
79
80     public List<Resource>    events                     = new ArrayList<Resource>();
81
82     private boolean          initialEventsResolved      = false;
83     private Resource         eventLog;
84     private Filter           eventSourceFilter;
85     private VirtualGraph     virtualGraph;
86     private AtomicBoolean    polling                    = new AtomicBoolean(true);
87
88     public EventSourceResolver(VirtualGraph virtualGraph, Resource eventLog, Filter eventSourceFilter) {
89         super("Event Source Resolver");
90         setPriority(DECORATE);
91         setUser(false);
92         setSystem(true);
93         this.virtualGraph = virtualGraph;
94         this.eventLog = eventLog;
95         this.eventSourceFilter = eventSourceFilter;
96     }
97
98     public void dispose() {
99         if (DEBUG)
100             System.out.println(this + ": DISPOSE");
101         polling.set(false);
102     }
103
104     @Override
105     public boolean shouldRun() {
106         return polling.get() && Simantics.peekSession() != null;
107     }
108
109     @Override
110     protected IStatus run(final IProgressMonitor monitor) {
111         try {
112             Session session = Simantics.peekSession();
113             if (session == null)
114                 return Status.CANCEL_STATUS;
115
116             final Resource[] events = pruneEvents();
117             if (events.length == 0 && initialEventsResolved)
118                 return Status.CANCEL_STATUS;
119
120             if (DEBUG) {
121                 System.out.println(this + ": resolve " + events.length + " events");
122                 if (!initialEventsResolved)
123                     System.out.println(this + ": resolve initial events");
124             }
125
126             setThread(Thread.currentThread());
127             monitor.beginTask("Resolve " + events.length + " events", events.length);
128             try {
129                 session.sync(new WriteRequest(virtualGraph) {
130                     Resource model;
131                     @SuppressWarnings("rawtypes")
132                     Function indexFunction;
133                     Layer0 L0;
134                     Layer0X L0X;
135                     EventResource EVENT;
136
137                     @Override
138                     public void perform(WriteGraph graph) throws DatabaseException {
139                         setThread(Thread.currentThread());
140
141                         L0 = Layer0.getInstance(graph);
142                         L0X = Layer0X.getInstance(graph);
143                         EVENT = EventResource.getInstance(graph);
144
145                         Resource run = graph.sync(new PossibleObject(eventLog, EVENT.HasEventProducer));
146                         if (run == null)
147                             throw new CancelTransactionException();
148                         if (!graph.hasStatement(run, SimulationResource.getInstance(graph).IsActive))
149                             throw new CancelTransactionException();
150
151                         model = graph.sync(new PossibleObject(eventLog, EVENT.IsEventLogOf));
152                         if (model == null)
153                             throw new CancelTransactionException();
154
155                         indexFunction = graph.adapt(L0X.DependencyResources, Function.class);
156
157                         if (!initialEventsResolved) {
158                             MapList<String, Resource> initialEventsBySource = new MapList<String,Resource>();
159                                 for(Resource slice : graph.getObjects(eventLog, L0.ConsistsOf)) {
160                                 Collection<Resource> initialEvents = graph.syncRequest(new ObjectsWithType(slice, L0.ConsistsOf, EVENT.Event));
161                                 if (DEBUG)
162                                     System.out.println(EventSourceResolver.this + ": resolving " + initialEvents.size() + " initial events");
163                                 MapList<String, Resource> bySource = sortEventsBySource(graph, initialEvents);
164                                 for(String key : bySource.getKeys()) {
165                                         initialEventsBySource.addAll(key, bySource.getValuesUnsafe(key));
166                                 }
167                                 }
168                                 for (String sourceName : initialEventsBySource.getKeys())
169                                         resolveEvents(graph, sourceName, initialEventsBySource.getValuesUnsafe(sourceName));
170                             initialEventsResolved = true;
171                         }
172
173                         MapList<String, Resource> eventsBySource = sortEventsBySource(graph, Arrays.asList(events));
174                         for (String sourceName : eventsBySource.getKeys()) {
175                             resolveEvents(graph, sourceName, eventsBySource.getValuesUnsafe(sourceName));
176                             monitor.worked(1);
177                         }
178                     }
179
180                     private MapList<String, Resource> sortEventsBySource(ReadGraph graph, Collection<Resource> events) throws DatabaseException {
181                         MapList<String, Resource> result = new MapList<String, Resource>();
182                         for (Resource event : events) {
183                             Collection<Resource> sources = graph.getObjects(event, EVENT.Event_source);
184                             if (!sources.isEmpty())
185                                 continue;
186
187                             String sourceName = graph.getPossibleRelatedValue(event, EVENT.Event_sourceName);
188                             if (sourceName != null && !sourceName.trim().isEmpty())
189                                 result.add(sourceName, event); 
190                         }
191                         return result;
192                     }
193
194                     @SuppressWarnings({ "unchecked" })
195                     private void resolveEvents(WriteGraph graph, String sourceName, Collection<Resource> eventsForSource) throws DatabaseException {
196                         if (DEBUG)
197                             System.out.println(EventSourceResolver.this + ": resolving " + eventsForSource.size() + " events for source " + sourceName);
198                         if (sourceName == null)
199                             throw new NullPointerException("null source name");
200
201                         monitor.subTask("Resolve events with source " + sourceName);
202                         if (DEBUG)
203                             System.out.println(EventSourceResolver.this + ": resolving source name " + sourceName);
204                         List<Resource> results = (List<Resource>) indexFunction.apply(graph, model,
205                                 IndexQueries.quoteTerm(Dependencies.FIELD_NAME, sourceName));
206                         for (Resource r : results) {
207                             if (eventSourceFilter != null && !eventSourceFilter.accept(graph, r))
208                                 continue;
209                             Resource rModel = graph.sync(new PossibleModel(r));
210                             if (model.equals(rModel)) {
211                                 if (DEBUG)
212                                     System.out.println(EventSourceResolver.this + ": found module " + NameUtils.getSafeName(graph, r, true));
213
214                                 for (Resource event : eventsForSource) {
215                                     Resource otherEvent = resolveAndMarkEventPair(graph, r, event);
216
217                                     if (otherEvent == null) {
218                                         // No match for this event, add also inverse of Event_source relation.
219                                         graph.claim(event, EVENT.Event_source, EVENT.Event_source_inverse, r);
220                                     } else {
221                                         // Found the event that matches this one, remove Event_source_inverse to
222                                         // prevent bloating the event source resource with unnecessary statements.
223                                         graph.deny(otherEvent, EVENT.Event_source, EVENT.Event_source_inverse, r);
224                                         graph.claim(otherEvent, EVENT.Event_source, null, r);
225                                         graph.claim(event, EVENT.Event_source, null, r);
226                                     }
227                                 }
228                             }
229                         }
230                     }
231
232                     private Resource resolveAndMarkEventPair(WriteGraph graph, Resource source, Resource event) throws DatabaseException {
233                         Integer eventIndex = graph.getPossibleRelatedValue(event, EVENT.Event_index);
234                         boolean returnEvent = graph.hasStatement(event, EVENT.ReturnEvent);
235
236                         for (Resource otherEvent : graph.getObjects(source, EVENT.Event_source_inverse)) {
237                             Integer index = graph.getPossibleRelatedValue(otherEvent, EVENT.Event_index);
238                             boolean ret = graph.hasStatement(otherEvent, EVENT.ReturnEvent);
239                             if (index != null && index.equals(eventIndex) && returnEvent != ret) {
240                                 if (returnEvent) {
241                                     if (DEBUG_DETAIL)
242                                         System.out.println(EventSourceResolver.this + ": event " + event + " returns event " + otherEvent);
243                                     graph.claim(event, EVENT.Returns, EVENT.ReturnedBy, otherEvent);
244                                 } else {
245                                     if (DEBUG_DETAIL)
246                                         System.out.println(EventSourceResolver.this + ": event " + event + " is returned by event " + otherEvent);
247                                     graph.claim(otherEvent, EVENT.Returns, EVENT.ReturnedBy, event);
248                                 }
249                                 return otherEvent;
250                             }
251                         }
252
253                         return null;
254                     }
255                 });
256             } catch (CancelTransactionException e) {
257                 polling.set(false);
258                 return Status.CANCEL_STATUS;
259             } catch (DatabaseException e) {
260                 return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event source resolution failed, see exception for details.", e);
261             }
262
263             return Status.OK_STATUS;
264         } finally {
265             monitor.done();
266             schedule(RESOLUTION_SCHEDULE_PERIOD);
267         }
268     }
269
270     private Resource[] pruneEvents() {
271         synchronized ( this.events ) {
272             Resource[] events = this.events.toArray(Resource.NONE);
273             this.events.clear();
274             return events;
275         }
276     }
277
278     public void queueEvents(Collection<Resource> events) {
279         synchronized (this.events) {
280             this.events.addAll(events);
281         }
282     }
283
284     public void queueEvent(Resource event) {
285         synchronized (this.events) {
286             this.events.add(event);
287         }
288     }
289
290 }