1 /*******************************************************************************
\r
2 * Copyright (c) 2012 Association for Decentralized Information Management
\r
3 * in Industry THTH ry.
\r
4 * All rights reserved. This program and the accompanying materials
\r
5 * are made available under the terms of the Eclipse Public License v1.0
\r
6 * which accompanies this distribution, and is available at
\r
7 * http://www.eclipse.org/legal/epl-v10.html
\r
10 * Semantum Oy - initial API and implementation
\r
11 *******************************************************************************/
\r
12 package org.simantics.event.writer;
\r
14 import java.util.ArrayList;
\r
15 import java.util.List;
\r
16 import java.util.concurrent.atomic.AtomicBoolean;
\r
18 import org.eclipse.core.runtime.IProgressMonitor;
\r
19 import org.eclipse.core.runtime.IStatus;
\r
20 import org.eclipse.core.runtime.Status;
\r
21 import org.simantics.DatabaseJob;
\r
22 import org.simantics.Simantics;
\r
23 import org.simantics.db.ReadGraph;
\r
24 import org.simantics.db.Resource;
\r
25 import org.simantics.db.Session;
\r
26 import org.simantics.db.VirtualGraph;
\r
27 import org.simantics.db.WriteOnlyGraph;
\r
28 import org.simantics.db.common.request.UniqueRead;
\r
29 import org.simantics.db.common.request.WriteOnlyRequest;
\r
30 import org.simantics.db.exception.CancelTransactionException;
\r
31 import org.simantics.db.exception.DatabaseException;
\r
32 import org.simantics.db.request.WriteOnly;
\r
33 import org.simantics.event.Activator;
\r
34 import org.simantics.event.util.EventUtils;
\r
35 import org.simantics.event.util.EventWriteData;
\r
38 * A self-scheduling job that queues up {@link EventWriteTask} instances and
\r
39 * periodically executes the queued up tasks in a single graph {@link WriteOnly}
\r
43 * Intended to be constructed and scheduled by experiment implementations.
\r
46 * Must be disposed when no longer used.
\r
48 * @author Tuukka Lehtonen
\r
50 * @see EventWriteTask
\r
51 * @see EventSourceResolver
\r
53 public class EventWriterJob extends DatabaseJob {
\r
55 private final boolean DEBUG = false;
\r
57 private static final int WRITE_SCHEDULE_PERIOD = 1000;
\r
59 private static final long MIN_QUEUE_QUIET_TIME = 1000L;
\r
61 private static final long MIN_WRITE_QUIET_TIME = 1000L;
\r
63 private static final int MAX_TASK_QUEUE_SIZE = 500;
\r
65 private static final EventWriteTask[] NONE = {};
\r
67 private List<EventWriteTask> tasks = new ArrayList<EventWriteTask>();
\r
69 private VirtualGraph virtualGraph;
\r
70 private Resource eventLog;
\r
71 private EventSourceResolver resolver;
\r
73 private AtomicBoolean scheduled = new AtomicBoolean(false);
\r
74 private AtomicBoolean polling = new AtomicBoolean(true);
\r
76 private long lastWriteTime = Long.MIN_VALUE;
\r
77 private long lastQueueTime = Long.MIN_VALUE;
\r
79 public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) {
\r
80 super("Event Writer");
\r
81 setPriority(DECORATE);
\r
84 this.virtualGraph = virtualGraph;
\r
85 this.eventLog = eventLog;
\r
86 this.resolver = resolver;
\r
89 public void dispose() {
\r
91 System.out.println(this + ": DISPOSE");
\r
95 public void queue(EventWriteTask task) {
\r
96 synchronized (tasks) {
\r
98 this.lastQueueTime = System.currentTimeMillis();
\r
100 if (scheduled.compareAndSet(false, true)) {
\r
101 schedule(WRITE_SCHEDULE_PERIOD);
\r
107 public boolean shouldRun() {
\r
108 return polling.get() && Simantics.peekSession() != null;
\r
112 protected IStatus run(final IProgressMonitor monitor) {
\r
114 Session session = Simantics.peekSession();
\r
115 if (session == null)
\r
116 return Status.CANCEL_STATUS;
\r
118 int taskCount = countTasks();
\r
119 if (taskCount == 0)
\r
120 return Status.CANCEL_STATUS;
\r
122 long currentTime = System.currentTimeMillis();
\r
124 // Must flush write at some point if no flush has been performed yet.
\r
125 if (lastWriteTime == Long.MIN_VALUE)
\r
126 lastWriteTime = currentTime;
\r
128 long queueQuietTime = currentTime - lastQueueTime;
\r
129 long writeQuietTime = currentTime - lastWriteTime;
\r
130 boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME;
\r
131 boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME;
\r
132 boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE;
\r
135 System.out.println("Queue quiet time: " + queueQuietTime);
\r
136 System.out.println("Write quiet time: " + writeQuietTime);
\r
137 System.out.println("Task count: " + taskCount);
\r
140 if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) {
\r
141 return Status.CANCEL_STATUS;
\r
145 if (queueQuietTimePassed)
\r
146 System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime);
\r
147 if (writeQuietTimePassed)
\r
148 System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime);
\r
149 if (taskCountExceeded)
\r
150 System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount);
\r
153 final EventWriteTask[] tasks = pruneTasks();
\r
154 if (tasks.length == 0)
\r
155 return Status.CANCEL_STATUS;
\r
158 System.out.println(this + ": about to write " + tasks.length + " events");
\r
159 setThread(Thread.currentThread());
\r
161 monitor.beginTask("Flush Events", tasks.length);
\r
165 final EventWriteData writeData = session.syncRequest(new UniqueRead<EventWriteData>() {
\r
168 public EventWriteData perform(ReadGraph graph) throws DatabaseException {
\r
169 return new EventWriteData(graph, eventLog, virtualGraph);
\r
174 session.sync(new WriteOnlyRequest(virtualGraph) {
\r
176 public void perform(WriteOnlyGraph graph) throws DatabaseException {
\r
178 setThread(Thread.currentThread());
\r
179 List<Resource> events = new ArrayList<Resource>(tasks.length);
\r
180 for (EventWriteTask task : tasks) {
\r
182 writeData.prepareToWrite(graph);
\r
184 Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos);
\r
185 if (event != null) {
\r
187 writeData.written();
\r
193 writeData.commit(graph);
\r
195 if (resolver != null) {
\r
197 System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution");
\r
198 resolver.queueEvents(events);
\r
202 lastWriteTime = System.currentTimeMillis();
\r
203 } catch (CancelTransactionException e) {
\r
204 polling.set(false);
\r
205 return Status.CANCEL_STATUS;
\r
206 } catch (DatabaseException e) {
\r
207 return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e);
\r
210 return Status.OK_STATUS;
\r
213 schedule(WRITE_SCHEDULE_PERIOD);
\r
217 private int countTasks() {
\r
218 synchronized (tasks) {
\r
219 return tasks.size();
\r
223 private EventWriteTask[] pruneTasks() {
\r
224 synchronized (this.tasks) {
\r
225 EventWriteTask[] tasks = this.tasks.toArray(NONE);
\r
226 this.tasks.clear();
\r