X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.event%2Fsrc%2Forg%2Fsimantics%2Fevent%2Fwriter%2FEventWriterJob.java;h=6f57e2a55550a430699557d199e0a9b53d271a24;hp=fb4638dea811b1bf6739ffd0dd798d1cd3fc396c;hb=refs%2Fchanges%2F38%2F238%2F2;hpb=24e2b34260f219f0d1644ca7a138894980e25b14 diff --git a/bundles/org.simantics.event/src/org/simantics/event/writer/EventWriterJob.java b/bundles/org.simantics.event/src/org/simantics/event/writer/EventWriterJob.java index fb4638dea..6f57e2a55 100644 --- a/bundles/org.simantics.event/src/org/simantics/event/writer/EventWriterJob.java +++ b/bundles/org.simantics.event/src/org/simantics/event/writer/EventWriterJob.java @@ -1,231 +1,231 @@ -/******************************************************************************* - * Copyright (c) 2012 Association for Decentralized Information Management - * in Industry THTH ry. - * All rights reserved. This program and the accompanying materials - * are made available under the terms of the Eclipse Public License v1.0 - * which accompanies this distribution, and is available at - * http://www.eclipse.org/legal/epl-v10.html - * - * Contributors: - * Semantum Oy - initial API and implementation - *******************************************************************************/ -package org.simantics.event.writer; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.eclipse.core.runtime.IProgressMonitor; -import org.eclipse.core.runtime.IStatus; -import org.eclipse.core.runtime.Status; -import org.simantics.DatabaseJob; -import org.simantics.Simantics; -import org.simantics.db.ReadGraph; -import org.simantics.db.Resource; -import org.simantics.db.Session; -import org.simantics.db.VirtualGraph; -import org.simantics.db.WriteOnlyGraph; -import org.simantics.db.common.request.UniqueRead; -import org.simantics.db.common.request.WriteOnlyRequest; -import org.simantics.db.exception.CancelTransactionException; -import org.simantics.db.exception.DatabaseException; -import org.simantics.db.request.WriteOnly; -import org.simantics.event.Activator; -import org.simantics.event.util.EventUtils; -import org.simantics.event.util.EventWriteData; - -/** - * A self-scheduling job that queues up {@link EventWriteTask} instances and - * periodically executes the queued up tasks in a single graph {@link WriteOnly} - * request. - * - *

- * Intended to be constructed and scheduled by experiment implementations. - * - *

- * Must be disposed when no longer used. - * - * @author Tuukka Lehtonen - * - * @see EventWriteTask - * @see EventSourceResolver - */ -public class EventWriterJob extends DatabaseJob { - - private final boolean DEBUG = false; - - private static final int WRITE_SCHEDULE_PERIOD = 1000; - - private static final long MIN_QUEUE_QUIET_TIME = 1000L; - - private static final long MIN_WRITE_QUIET_TIME = 1000L; - - private static final int MAX_TASK_QUEUE_SIZE = 500; - - private static final EventWriteTask[] NONE = {}; - - private List tasks = new ArrayList(); - - private VirtualGraph virtualGraph; - private Resource eventLog; - private EventSourceResolver resolver; - - private AtomicBoolean scheduled = new AtomicBoolean(false); - private AtomicBoolean polling = new AtomicBoolean(true); - - private long lastWriteTime = Long.MIN_VALUE; - private long lastQueueTime = Long.MIN_VALUE; - - public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) { - super("Event Writer"); - setPriority(DECORATE); - setUser(false); - setSystem(false); - this.virtualGraph = virtualGraph; - this.eventLog = eventLog; - this.resolver = resolver; - } - - public void dispose() { - if (DEBUG) - System.out.println(this + ": DISPOSE"); - polling.set(false); - } - - public void queue(EventWriteTask task) { - synchronized (tasks) { - tasks.add(task); - this.lastQueueTime = System.currentTimeMillis(); - - if (scheduled.compareAndSet(false, true)) { - schedule(WRITE_SCHEDULE_PERIOD); - } - } - } - - @Override - public boolean shouldRun() { - return polling.get() && Simantics.peekSession() != null; - } - - @Override - protected IStatus run(final IProgressMonitor monitor) { - try { - Session session = Simantics.peekSession(); - if (session == null) - return Status.CANCEL_STATUS; - - int taskCount = countTasks(); - if (taskCount == 0) - return Status.CANCEL_STATUS; - - long currentTime = System.currentTimeMillis(); - - // Must flush write at some point if no flush has been performed yet. - if (lastWriteTime == Long.MIN_VALUE) - lastWriteTime = currentTime; - - long queueQuietTime = currentTime - lastQueueTime; - long writeQuietTime = currentTime - lastWriteTime; - boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME; - boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME; - boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE; - - if (DEBUG) { - System.out.println("Queue quiet time: " + queueQuietTime); - System.out.println("Write quiet time: " + writeQuietTime); - System.out.println("Task count: " + taskCount); - } - - if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) { - return Status.CANCEL_STATUS; - } - - if (DEBUG) { - if (queueQuietTimePassed) - System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime); - if (writeQuietTimePassed) - System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime); - if (taskCountExceeded) - System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount); - } - - final EventWriteTask[] tasks = pruneTasks(); - if (tasks.length == 0) - return Status.CANCEL_STATUS; - - if (DEBUG) - System.out.println(this + ": about to write " + tasks.length + " events"); - setThread(Thread.currentThread()); - - monitor.beginTask("Flush Events", tasks.length); - - try { - - final EventWriteData writeData = session.syncRequest(new UniqueRead() { - - @Override - public EventWriteData perform(ReadGraph graph) throws DatabaseException { - return new EventWriteData(graph, eventLog, virtualGraph); - } - - }); - - session.sync(new WriteOnlyRequest(virtualGraph) { - @Override - public void perform(WriteOnlyGraph graph) throws DatabaseException { - - setThread(Thread.currentThread()); - List events = new ArrayList(tasks.length); - for (EventWriteTask task : tasks) { - - writeData.prepareToWrite(graph); - - Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos); - if (event != null) { - events.add(event); - writeData.written(); - } - monitor.worked(1); - - } - - writeData.commit(graph); - - if (resolver != null) { - if (DEBUG) - System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution"); - resolver.queueEvents(events); - } - } - }); - lastWriteTime = System.currentTimeMillis(); - } catch (CancelTransactionException e) { - polling.set(false); - return Status.CANCEL_STATUS; - } catch (DatabaseException e) { - return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e); - } - - return Status.OK_STATUS; - } finally { - monitor.done(); - schedule(WRITE_SCHEDULE_PERIOD); - } - } - - private int countTasks() { - synchronized (tasks) { - return tasks.size(); - } - } - - private EventWriteTask[] pruneTasks() { - synchronized (this.tasks) { - EventWriteTask[] tasks = this.tasks.toArray(NONE); - this.tasks.clear(); - return tasks; - } - } - +/******************************************************************************* + * Copyright (c) 2012 Association for Decentralized Information Management + * in Industry THTH ry. + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Semantum Oy - initial API and implementation + *******************************************************************************/ +package org.simantics.event.writer; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.eclipse.core.runtime.IProgressMonitor; +import org.eclipse.core.runtime.IStatus; +import org.eclipse.core.runtime.Status; +import org.simantics.DatabaseJob; +import org.simantics.Simantics; +import org.simantics.db.ReadGraph; +import org.simantics.db.Resource; +import org.simantics.db.Session; +import org.simantics.db.VirtualGraph; +import org.simantics.db.WriteOnlyGraph; +import org.simantics.db.common.request.UniqueRead; +import org.simantics.db.common.request.WriteOnlyRequest; +import org.simantics.db.exception.CancelTransactionException; +import org.simantics.db.exception.DatabaseException; +import org.simantics.db.request.WriteOnly; +import org.simantics.event.Activator; +import org.simantics.event.util.EventUtils; +import org.simantics.event.util.EventWriteData; + +/** + * A self-scheduling job that queues up {@link EventWriteTask} instances and + * periodically executes the queued up tasks in a single graph {@link WriteOnly} + * request. + * + *

+ * Intended to be constructed and scheduled by experiment implementations. + * + *

+ * Must be disposed when no longer used. + * + * @author Tuukka Lehtonen + * + * @see EventWriteTask + * @see EventSourceResolver + */ +public class EventWriterJob extends DatabaseJob { + + private final boolean DEBUG = false; + + private static final int WRITE_SCHEDULE_PERIOD = 1000; + + private static final long MIN_QUEUE_QUIET_TIME = 1000L; + + private static final long MIN_WRITE_QUIET_TIME = 1000L; + + private static final int MAX_TASK_QUEUE_SIZE = 500; + + private static final EventWriteTask[] NONE = {}; + + private List tasks = new ArrayList(); + + private VirtualGraph virtualGraph; + private Resource eventLog; + private EventSourceResolver resolver; + + private AtomicBoolean scheduled = new AtomicBoolean(false); + private AtomicBoolean polling = new AtomicBoolean(true); + + private long lastWriteTime = Long.MIN_VALUE; + private long lastQueueTime = Long.MIN_VALUE; + + public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) { + super("Event Writer"); + setPriority(DECORATE); + setUser(false); + setSystem(false); + this.virtualGraph = virtualGraph; + this.eventLog = eventLog; + this.resolver = resolver; + } + + public void dispose() { + if (DEBUG) + System.out.println(this + ": DISPOSE"); + polling.set(false); + } + + public void queue(EventWriteTask task) { + synchronized (tasks) { + tasks.add(task); + this.lastQueueTime = System.currentTimeMillis(); + + if (scheduled.compareAndSet(false, true)) { + schedule(WRITE_SCHEDULE_PERIOD); + } + } + } + + @Override + public boolean shouldRun() { + return polling.get() && Simantics.peekSession() != null; + } + + @Override + protected IStatus run(final IProgressMonitor monitor) { + try { + Session session = Simantics.peekSession(); + if (session == null) + return Status.CANCEL_STATUS; + + int taskCount = countTasks(); + if (taskCount == 0) + return Status.CANCEL_STATUS; + + long currentTime = System.currentTimeMillis(); + + // Must flush write at some point if no flush has been performed yet. + if (lastWriteTime == Long.MIN_VALUE) + lastWriteTime = currentTime; + + long queueQuietTime = currentTime - lastQueueTime; + long writeQuietTime = currentTime - lastWriteTime; + boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME; + boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME; + boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE; + + if (DEBUG) { + System.out.println("Queue quiet time: " + queueQuietTime); + System.out.println("Write quiet time: " + writeQuietTime); + System.out.println("Task count: " + taskCount); + } + + if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) { + return Status.CANCEL_STATUS; + } + + if (DEBUG) { + if (queueQuietTimePassed) + System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime); + if (writeQuietTimePassed) + System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime); + if (taskCountExceeded) + System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount); + } + + final EventWriteTask[] tasks = pruneTasks(); + if (tasks.length == 0) + return Status.CANCEL_STATUS; + + if (DEBUG) + System.out.println(this + ": about to write " + tasks.length + " events"); + setThread(Thread.currentThread()); + + monitor.beginTask("Flush Events", tasks.length); + + try { + + final EventWriteData writeData = session.syncRequest(new UniqueRead() { + + @Override + public EventWriteData perform(ReadGraph graph) throws DatabaseException { + return new EventWriteData(graph, eventLog, virtualGraph); + } + + }); + + session.sync(new WriteOnlyRequest(virtualGraph) { + @Override + public void perform(WriteOnlyGraph graph) throws DatabaseException { + + setThread(Thread.currentThread()); + List events = new ArrayList(tasks.length); + for (EventWriteTask task : tasks) { + + writeData.prepareToWrite(graph); + + Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos); + if (event != null) { + events.add(event); + writeData.written(); + } + monitor.worked(1); + + } + + writeData.commit(graph); + + if (resolver != null) { + if (DEBUG) + System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution"); + resolver.queueEvents(events); + } + } + }); + lastWriteTime = System.currentTimeMillis(); + } catch (CancelTransactionException e) { + polling.set(false); + return Status.CANCEL_STATUS; + } catch (DatabaseException e) { + return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e); + } + + return Status.OK_STATUS; + } finally { + monitor.done(); + schedule(WRITE_SCHEDULE_PERIOD); + } + } + + private int countTasks() { + synchronized (tasks) { + return tasks.size(); + } + } + + private EventWriteTask[] pruneTasks() { + synchronized (this.tasks) { + EventWriteTask[] tasks = this.tasks.toArray(NONE); + this.tasks.clear(); + return tasks; + } + } + } \ No newline at end of file