]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.event/src/org/simantics/event/writer/EventWriterJob.java
Fixed multiple issues causing dangling references to discarded queries
[simantics/platform.git] / bundles / org.simantics.event / src / org / simantics / event / writer / EventWriterJob.java
1 /*******************************************************************************
2  * Copyright (c) 2012 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     Semantum Oy - initial API and implementation
11  *******************************************************************************/
12 package org.simantics.event.writer;
13
14 import java.util.ArrayList;
15 import java.util.List;
16 import java.util.concurrent.atomic.AtomicBoolean;
17
18 import org.eclipse.core.runtime.IProgressMonitor;
19 import org.eclipse.core.runtime.IStatus;
20 import org.eclipse.core.runtime.Status;
21 import org.simantics.DatabaseJob;
22 import org.simantics.Simantics;
23 import org.simantics.db.ReadGraph;
24 import org.simantics.db.Resource;
25 import org.simantics.db.Session;
26 import org.simantics.db.VirtualGraph;
27 import org.simantics.db.WriteOnlyGraph;
28 import org.simantics.db.common.request.UniqueRead;
29 import org.simantics.db.common.request.WriteOnlyRequest;
30 import org.simantics.db.exception.CancelTransactionException;
31 import org.simantics.db.exception.DatabaseException;
32 import org.simantics.db.request.WriteOnly;
33 import org.simantics.event.Activator;
34 import org.simantics.event.util.EventUtils;
35 import org.simantics.event.util.EventWriteData;
36
37 /**
38  * A self-scheduling job that queues up {@link EventWriteTask} instances and
39  * periodically executes the queued up tasks in a single graph {@link WriteOnly}
40  * request.
41  * 
42  * <p>
43  * Intended to be constructed and scheduled by experiment implementations.
44  * 
45  * <p>
46  * Must be disposed when no longer used.
47  * 
48  * @author Tuukka Lehtonen
49  * 
50  * @see EventWriteTask
51  * @see EventSourceResolver
52  */
53 public class EventWriterJob extends DatabaseJob {
54
55     private final boolean                 DEBUG                 = false;
56
57     private static final int              WRITE_SCHEDULE_PERIOD = 1000;
58
59     private static final long             MIN_QUEUE_QUIET_TIME  = 1000L;
60
61     private static final long             MIN_WRITE_QUIET_TIME  = 1000L;
62
63     private static final int              MAX_TASK_QUEUE_SIZE   = 500;
64
65     private static final EventWriteTask[] NONE                  = {};
66
67     private List<EventWriteTask>          tasks                 = new ArrayList<EventWriteTask>();
68
69     private VirtualGraph                  virtualGraph;
70     private Resource                      eventLog;
71     private EventSourceResolver           resolver;
72
73     private AtomicBoolean                 scheduled             = new AtomicBoolean(false);
74     private AtomicBoolean                 polling               = new AtomicBoolean(true);
75
76     private long                          lastWriteTime         = Long.MIN_VALUE;
77     private long                          lastQueueTime         = Long.MIN_VALUE;
78
79     public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) {
80         super("Event Writer");
81         setPriority(DECORATE);
82         setUser(false);
83         setSystem(false);
84         this.virtualGraph = virtualGraph;
85         this.eventLog = eventLog;
86         this.resolver = resolver;
87     }
88
89     public void dispose() {
90         if (DEBUG)
91             System.out.println(this + ": DISPOSE");
92         polling.set(false);
93     }
94
95     public void queue(EventWriteTask task) {
96         synchronized (tasks) {
97             tasks.add(task);
98             this.lastQueueTime = System.currentTimeMillis();
99
100             if (scheduled.compareAndSet(false, true)) {
101                 schedule(WRITE_SCHEDULE_PERIOD);
102             }
103         }
104     }
105
106     @Override
107     public boolean shouldRun() {
108         return polling.get() && Simantics.peekSession() != null;
109     }
110
111     @Override
112     protected IStatus run(final IProgressMonitor monitor) {
113         try {
114             Session session = Simantics.peekSession();
115             if (session == null)
116                 return Status.CANCEL_STATUS;
117
118             int taskCount = countTasks();
119             if (taskCount == 0)
120                 return Status.CANCEL_STATUS;
121
122             long currentTime = System.currentTimeMillis();
123
124             // Must flush write at some point if no flush has been performed yet.
125             if (lastWriteTime == Long.MIN_VALUE)
126                 lastWriteTime = currentTime;
127
128             long queueQuietTime = currentTime - lastQueueTime;
129             long writeQuietTime = currentTime - lastWriteTime;
130             boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME;
131             boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME;
132             boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE;
133
134             if (DEBUG) {
135                 System.out.println("Queue quiet time: " + queueQuietTime);
136                 System.out.println("Write quiet time: " + writeQuietTime);
137                 System.out.println("Task count: " + taskCount);
138             }
139
140             if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) {
141                 return Status.CANCEL_STATUS;
142             }
143
144             if (DEBUG) {
145                 if (queueQuietTimePassed)
146                     System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime);
147                 if (writeQuietTimePassed)
148                     System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime);
149                 if (taskCountExceeded)
150                     System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount);
151             }
152
153             final EventWriteTask[] tasks = pruneTasks();
154             if (tasks.length == 0)
155                 return Status.CANCEL_STATUS;
156
157             if (DEBUG)
158                 System.out.println(this + ": about to write " + tasks.length + " events");
159             setThread(Thread.currentThread());
160
161             monitor.beginTask("Flush Events", tasks.length);
162             
163             try {
164                 
165                 final EventWriteData writeData = session.syncRequest(new UniqueRead<EventWriteData>() {
166
167                         @Override
168                         public EventWriteData perform(ReadGraph graph) throws DatabaseException {
169                                 return new EventWriteData(graph, eventLog, virtualGraph);
170                         }
171                         
172                 });     
173
174                 session.sync(new WriteOnlyRequest(virtualGraph) {
175                     @Override
176                     public void perform(WriteOnlyGraph graph) throws DatabaseException {
177                         
178                         setThread(Thread.currentThread());
179                         List<Resource> events = new ArrayList<Resource>(tasks.length);
180                         for (EventWriteTask task : tasks) {
181
182                                 writeData.prepareToWrite(graph);
183                             
184                             Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos);
185                             if (event != null) {
186                                 events.add(event);
187                                 writeData.written();
188                             }
189                             monitor.worked(1);
190                             
191                         }
192                         
193                         writeData.commit(graph);
194                         
195                         if (resolver != null) {
196                             if (DEBUG)
197                                 System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution");
198                             resolver.queueEvents(events);
199                         }
200                     }
201                 });
202                 lastWriteTime = System.currentTimeMillis();
203             } catch (CancelTransactionException e) {
204                 polling.set(false);
205                 return Status.CANCEL_STATUS;
206             } catch (DatabaseException e) {
207                 return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e);
208             }
209
210             return Status.OK_STATUS;
211         } finally {
212             monitor.done();
213             schedule(WRITE_SCHEDULE_PERIOD);
214         }
215     }
216
217     private int countTasks() {
218         synchronized (tasks) {
219             return tasks.size();
220         }
221     }
222
223     private EventWriteTask[] pruneTasks() {
224         synchronized (this.tasks) {
225             EventWriteTask[] tasks = this.tasks.toArray(NONE);
226             this.tasks.clear();
227             return tasks;
228         }
229     }
230
231 }