/******************************************************************************* * Copyright (c) 2012 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 * which accompanies this distribution, and is available at * http://www.eclipse.org/legal/epl-v10.html * * Contributors: * Semantum Oy - initial API and implementation *******************************************************************************/ package org.simantics.event.writer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.core.runtime.IProgressMonitor; import org.eclipse.core.runtime.IStatus; import org.eclipse.core.runtime.Status; import org.simantics.DatabaseJob; import org.simantics.Simantics; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteOnlyGraph; import org.simantics.db.common.request.UniqueRead; import org.simantics.db.common.request.WriteOnlyRequest; import org.simantics.db.exception.CancelTransactionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.request.WriteOnly; import org.simantics.event.Activator; import org.simantics.event.util.EventUtils; import org.simantics.event.util.EventWriteData; /** * A self-scheduling job that queues up {@link EventWriteTask} instances and * periodically executes the queued up tasks in a single graph {@link WriteOnly} * request. * *

* Intended to be constructed and scheduled by experiment implementations. * *

* Must be disposed when no longer used. * * @author Tuukka Lehtonen * * @see EventWriteTask * @see EventSourceResolver */ public class EventWriterJob extends DatabaseJob { private final boolean DEBUG = false; private static final int WRITE_SCHEDULE_PERIOD = 1000; private static final long MIN_QUEUE_QUIET_TIME = 1000L; private static final long MIN_WRITE_QUIET_TIME = 1000L; private static final int MAX_TASK_QUEUE_SIZE = 500; private static final EventWriteTask[] NONE = {}; private List tasks = new ArrayList(); private VirtualGraph virtualGraph; private Resource eventLog; private EventSourceResolver resolver; private AtomicBoolean scheduled = new AtomicBoolean(false); private AtomicBoolean polling = new AtomicBoolean(true); private long lastWriteTime = Long.MIN_VALUE; private long lastQueueTime = Long.MIN_VALUE; public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) { super("Event Writer"); setPriority(DECORATE); setUser(false); setSystem(false); this.virtualGraph = virtualGraph; this.eventLog = eventLog; this.resolver = resolver; } public void dispose() { if (DEBUG) System.out.println(this + ": DISPOSE"); polling.set(false); } public void queue(EventWriteTask task) { synchronized (tasks) { tasks.add(task); this.lastQueueTime = System.currentTimeMillis(); if (scheduled.compareAndSet(false, true)) { schedule(WRITE_SCHEDULE_PERIOD); } } } @Override public boolean shouldRun() { return polling.get() && Simantics.peekSession() != null; } @Override protected IStatus run(final IProgressMonitor monitor) { try { Session session = Simantics.peekSession(); if (session == null) return Status.CANCEL_STATUS; int taskCount = countTasks(); if (taskCount == 0) return Status.CANCEL_STATUS; long currentTime = System.currentTimeMillis(); // Must flush write at some point if no flush has been performed yet. if (lastWriteTime == Long.MIN_VALUE) lastWriteTime = currentTime; long queueQuietTime = currentTime - lastQueueTime; long writeQuietTime = currentTime - lastWriteTime; boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME; boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME; boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE; if (DEBUG) { System.out.println("Queue quiet time: " + queueQuietTime); System.out.println("Write quiet time: " + writeQuietTime); System.out.println("Task count: " + taskCount); } if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) { return Status.CANCEL_STATUS; } if (DEBUG) { if (queueQuietTimePassed) System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime); if (writeQuietTimePassed) System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime); if (taskCountExceeded) System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount); } final EventWriteTask[] tasks = pruneTasks(); if (tasks.length == 0) return Status.CANCEL_STATUS; if (DEBUG) System.out.println(this + ": about to write " + tasks.length + " events"); setThread(Thread.currentThread()); monitor.beginTask("Flush Events", tasks.length); try { final EventWriteData writeData = session.syncRequest(new UniqueRead() { @Override public EventWriteData perform(ReadGraph graph) throws DatabaseException { return new EventWriteData(graph, eventLog, virtualGraph); } }); session.sync(new WriteOnlyRequest(virtualGraph) { @Override public void perform(WriteOnlyGraph graph) throws DatabaseException { setThread(Thread.currentThread()); List events = new ArrayList(tasks.length); for (EventWriteTask task : tasks) { writeData.prepareToWrite(graph); Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos); if (event != null) { events.add(event); writeData.written(); } monitor.worked(1); } writeData.commit(graph); if (resolver != null) { if (DEBUG) System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution"); resolver.queueEvents(events); } } }); lastWriteTime = System.currentTimeMillis(); } catch (CancelTransactionException e) { polling.set(false); return Status.CANCEL_STATUS; } catch (DatabaseException e) { return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e); } return Status.OK_STATUS; } finally { monitor.done(); schedule(WRITE_SCHEDULE_PERIOD); } } private int countTasks() { synchronized (tasks) { return tasks.size(); } } private EventWriteTask[] pruneTasks() { synchronized (this.tasks) { EventWriteTask[] tasks = this.tasks.toArray(NONE); this.tasks.clear(); return tasks; } } }