/*******************************************************************************
* Copyright (c) 2012 Association for Decentralized Information Management
* in Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* Contributors:
* Semantum Oy - initial API and implementation
*******************************************************************************/
package org.simantics.event.writer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.core.runtime.IProgressMonitor;
import org.eclipse.core.runtime.IStatus;
import org.eclipse.core.runtime.Status;
import org.simantics.DatabaseJob;
import org.simantics.Simantics;
import org.simantics.db.ReadGraph;
import org.simantics.db.Resource;
import org.simantics.db.Session;
import org.simantics.db.VirtualGraph;
import org.simantics.db.WriteOnlyGraph;
import org.simantics.db.common.request.UniqueRead;
import org.simantics.db.common.request.WriteOnlyRequest;
import org.simantics.db.exception.CancelTransactionException;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.request.WriteOnly;
import org.simantics.event.Activator;
import org.simantics.event.util.EventUtils;
import org.simantics.event.util.EventWriteData;
/**
* A self-scheduling job that queues up {@link EventWriteTask} instances and
* periodically executes the queued up tasks in a single graph {@link WriteOnly}
* request.
*
*
* Intended to be constructed and scheduled by experiment implementations.
*
*
* Must be disposed when no longer used.
*
* @author Tuukka Lehtonen
*
* @see EventWriteTask
* @see EventSourceResolver
*/
public class EventWriterJob extends DatabaseJob {
private final boolean DEBUG = false;
private static final int WRITE_SCHEDULE_PERIOD = 1000;
private static final long MIN_QUEUE_QUIET_TIME = 1000L;
private static final long MIN_WRITE_QUIET_TIME = 1000L;
private static final int MAX_TASK_QUEUE_SIZE = 500;
private static final EventWriteTask[] NONE = {};
private List tasks = new ArrayList();
private VirtualGraph virtualGraph;
private Resource eventLog;
private EventSourceResolver resolver;
private AtomicBoolean scheduled = new AtomicBoolean(false);
private AtomicBoolean polling = new AtomicBoolean(true);
private long lastWriteTime = Long.MIN_VALUE;
private long lastQueueTime = Long.MIN_VALUE;
public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) {
super("Event Writer");
setPriority(DECORATE);
setUser(false);
setSystem(false);
this.virtualGraph = virtualGraph;
this.eventLog = eventLog;
this.resolver = resolver;
}
public void dispose() {
if (DEBUG)
System.out.println(this + ": DISPOSE");
polling.set(false);
}
public void queue(EventWriteTask task) {
synchronized (tasks) {
tasks.add(task);
this.lastQueueTime = System.currentTimeMillis();
if (scheduled.compareAndSet(false, true)) {
schedule(WRITE_SCHEDULE_PERIOD);
}
}
}
@Override
public boolean shouldRun() {
return polling.get() && Simantics.peekSession() != null;
}
@Override
protected IStatus run(final IProgressMonitor monitor) {
try {
Session session = Simantics.peekSession();
if (session == null)
return Status.CANCEL_STATUS;
int taskCount = countTasks();
if (taskCount == 0)
return Status.CANCEL_STATUS;
long currentTime = System.currentTimeMillis();
// Must flush write at some point if no flush has been performed yet.
if (lastWriteTime == Long.MIN_VALUE)
lastWriteTime = currentTime;
long queueQuietTime = currentTime - lastQueueTime;
long writeQuietTime = currentTime - lastWriteTime;
boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME;
boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME;
boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE;
if (DEBUG) {
System.out.println("Queue quiet time: " + queueQuietTime);
System.out.println("Write quiet time: " + writeQuietTime);
System.out.println("Task count: " + taskCount);
}
if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) {
return Status.CANCEL_STATUS;
}
if (DEBUG) {
if (queueQuietTimePassed)
System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime);
if (writeQuietTimePassed)
System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime);
if (taskCountExceeded)
System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount);
}
final EventWriteTask[] tasks = pruneTasks();
if (tasks.length == 0)
return Status.CANCEL_STATUS;
if (DEBUG)
System.out.println(this + ": about to write " + tasks.length + " events");
setThread(Thread.currentThread());
monitor.beginTask("Flush Events", tasks.length);
try {
final EventWriteData writeData = session.syncRequest(new UniqueRead() {
@Override
public EventWriteData perform(ReadGraph graph) throws DatabaseException {
return new EventWriteData(graph, eventLog, virtualGraph);
}
});
session.sync(new WriteOnlyRequest(virtualGraph) {
@Override
public void perform(WriteOnlyGraph graph) throws DatabaseException {
setThread(Thread.currentThread());
List events = new ArrayList(tasks.length);
for (EventWriteTask task : tasks) {
writeData.prepareToWrite(graph);
Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos);
if (event != null) {
events.add(event);
writeData.written();
}
monitor.worked(1);
}
writeData.commit(graph);
if (resolver != null) {
if (DEBUG)
System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution");
resolver.queueEvents(events);
}
}
});
lastWriteTime = System.currentTimeMillis();
} catch (CancelTransactionException e) {
polling.set(false);
return Status.CANCEL_STATUS;
} catch (DatabaseException e) {
return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e);
}
return Status.OK_STATUS;
} finally {
monitor.done();
schedule(WRITE_SCHEDULE_PERIOD);
}
}
private int countTasks() {
synchronized (tasks) {
return tasks.size();
}
}
private EventWriteTask[] pruneTasks() {
synchronized (this.tasks) {
EventWriteTask[] tasks = this.tasks.toArray(NONE);
this.tasks.clear();
return tasks;
}
}
}