]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.event/src/org/simantics/event/writer/EventWriterJob.java
Fixed all line endings of the repository
[simantics/platform.git] / bundles / org.simantics.event / src / org / simantics / event / writer / EventWriterJob.java
index fb4638dea811b1bf6739ffd0dd798d1cd3fc396c..6f57e2a55550a430699557d199e0a9b53d271a24 100644 (file)
-/*******************************************************************************\r
- * Copyright (c) 2012 Association for Decentralized Information Management\r
- * in Industry THTH ry.\r
- * All rights reserved. This program and the accompanying materials\r
- * are made available under the terms of the Eclipse Public License v1.0\r
- * which accompanies this distribution, and is available at\r
- * http://www.eclipse.org/legal/epl-v10.html\r
- *\r
- * Contributors:\r
- *     Semantum Oy - initial API and implementation\r
- *******************************************************************************/\r
-package org.simantics.event.writer;\r
-\r
-import java.util.ArrayList;\r
-import java.util.List;\r
-import java.util.concurrent.atomic.AtomicBoolean;\r
-\r
-import org.eclipse.core.runtime.IProgressMonitor;\r
-import org.eclipse.core.runtime.IStatus;\r
-import org.eclipse.core.runtime.Status;\r
-import org.simantics.DatabaseJob;\r
-import org.simantics.Simantics;\r
-import org.simantics.db.ReadGraph;\r
-import org.simantics.db.Resource;\r
-import org.simantics.db.Session;\r
-import org.simantics.db.VirtualGraph;\r
-import org.simantics.db.WriteOnlyGraph;\r
-import org.simantics.db.common.request.UniqueRead;\r
-import org.simantics.db.common.request.WriteOnlyRequest;\r
-import org.simantics.db.exception.CancelTransactionException;\r
-import org.simantics.db.exception.DatabaseException;\r
-import org.simantics.db.request.WriteOnly;\r
-import org.simantics.event.Activator;\r
-import org.simantics.event.util.EventUtils;\r
-import org.simantics.event.util.EventWriteData;\r
-\r
-/**\r
- * A self-scheduling job that queues up {@link EventWriteTask} instances and\r
- * periodically executes the queued up tasks in a single graph {@link WriteOnly}\r
- * request.\r
- * \r
- * <p>\r
- * Intended to be constructed and scheduled by experiment implementations.\r
- * \r
- * <p>\r
- * Must be disposed when no longer used.\r
- * \r
- * @author Tuukka Lehtonen\r
- * \r
- * @see EventWriteTask\r
- * @see EventSourceResolver\r
- */\r
-public class EventWriterJob extends DatabaseJob {\r
-\r
-    private final boolean                 DEBUG                 = false;\r
-\r
-    private static final int              WRITE_SCHEDULE_PERIOD = 1000;\r
-\r
-    private static final long             MIN_QUEUE_QUIET_TIME  = 1000L;\r
-\r
-    private static final long             MIN_WRITE_QUIET_TIME  = 1000L;\r
-\r
-    private static final int              MAX_TASK_QUEUE_SIZE   = 500;\r
-\r
-    private static final EventWriteTask[] NONE                  = {};\r
-\r
-    private List<EventWriteTask>          tasks                 = new ArrayList<EventWriteTask>();\r
-\r
-    private VirtualGraph                  virtualGraph;\r
-    private Resource                      eventLog;\r
-    private EventSourceResolver           resolver;\r
-\r
-    private AtomicBoolean                 scheduled             = new AtomicBoolean(false);\r
-    private AtomicBoolean                 polling               = new AtomicBoolean(true);\r
-\r
-    private long                          lastWriteTime         = Long.MIN_VALUE;\r
-    private long                          lastQueueTime         = Long.MIN_VALUE;\r
-\r
-    public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) {\r
-        super("Event Writer");\r
-        setPriority(DECORATE);\r
-        setUser(false);\r
-        setSystem(false);\r
-        this.virtualGraph = virtualGraph;\r
-        this.eventLog = eventLog;\r
-        this.resolver = resolver;\r
-    }\r
-\r
-    public void dispose() {\r
-        if (DEBUG)\r
-            System.out.println(this + ": DISPOSE");\r
-        polling.set(false);\r
-    }\r
-\r
-    public void queue(EventWriteTask task) {\r
-        synchronized (tasks) {\r
-            tasks.add(task);\r
-            this.lastQueueTime = System.currentTimeMillis();\r
-\r
-            if (scheduled.compareAndSet(false, true)) {\r
-                schedule(WRITE_SCHEDULE_PERIOD);\r
-            }\r
-        }\r
-    }\r
-\r
-    @Override\r
-    public boolean shouldRun() {\r
-        return polling.get() && Simantics.peekSession() != null;\r
-    }\r
-\r
-    @Override\r
-    protected IStatus run(final IProgressMonitor monitor) {\r
-        try {\r
-            Session session = Simantics.peekSession();\r
-            if (session == null)\r
-                return Status.CANCEL_STATUS;\r
-\r
-            int taskCount = countTasks();\r
-            if (taskCount == 0)\r
-                return Status.CANCEL_STATUS;\r
-\r
-            long currentTime = System.currentTimeMillis();\r
-\r
-            // Must flush write at some point if no flush has been performed yet.\r
-            if (lastWriteTime == Long.MIN_VALUE)\r
-                lastWriteTime = currentTime;\r
-\r
-            long queueQuietTime = currentTime - lastQueueTime;\r
-            long writeQuietTime = currentTime - lastWriteTime;\r
-            boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME;\r
-            boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME;\r
-            boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE;\r
-\r
-            if (DEBUG) {\r
-                System.out.println("Queue quiet time: " + queueQuietTime);\r
-                System.out.println("Write quiet time: " + writeQuietTime);\r
-                System.out.println("Task count: " + taskCount);\r
-            }\r
-\r
-            if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) {\r
-                return Status.CANCEL_STATUS;\r
-            }\r
-\r
-            if (DEBUG) {\r
-                if (queueQuietTimePassed)\r
-                    System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime);\r
-                if (writeQuietTimePassed)\r
-                    System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime);\r
-                if (taskCountExceeded)\r
-                    System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount);\r
-            }\r
-\r
-            final EventWriteTask[] tasks = pruneTasks();\r
-            if (tasks.length == 0)\r
-                return Status.CANCEL_STATUS;\r
-\r
-            if (DEBUG)\r
-                System.out.println(this + ": about to write " + tasks.length + " events");\r
-            setThread(Thread.currentThread());\r
-\r
-            monitor.beginTask("Flush Events", tasks.length);\r
-            \r
-            try {\r
-               \r
-               final EventWriteData writeData = session.syncRequest(new UniqueRead<EventWriteData>() {\r
-\r
-                       @Override\r
-                       public EventWriteData perform(ReadGraph graph) throws DatabaseException {\r
-                               return new EventWriteData(graph, eventLog, virtualGraph);\r
-                       }\r
-                       \r
-               });     \r
-\r
-                session.sync(new WriteOnlyRequest(virtualGraph) {\r
-                    @Override\r
-                    public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
-                       \r
-                        setThread(Thread.currentThread());\r
-                        List<Resource> events = new ArrayList<Resource>(tasks.length);\r
-                        for (EventWriteTask task : tasks) {\r
-\r
-                               writeData.prepareToWrite(graph);\r
-                            \r
-                            Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos);\r
-                            if (event != null) {\r
-                                events.add(event);\r
-                                writeData.written();\r
-                            }\r
-                            monitor.worked(1);\r
-                            \r
-                        }\r
-                        \r
-                        writeData.commit(graph);\r
-                        \r
-                        if (resolver != null) {\r
-                            if (DEBUG)\r
-                                System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution");\r
-                            resolver.queueEvents(events);\r
-                        }\r
-                    }\r
-                });\r
-                lastWriteTime = System.currentTimeMillis();\r
-            } catch (CancelTransactionException e) {\r
-                polling.set(false);\r
-                return Status.CANCEL_STATUS;\r
-            } catch (DatabaseException e) {\r
-                return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e);\r
-            }\r
-\r
-            return Status.OK_STATUS;\r
-        } finally {\r
-            monitor.done();\r
-            schedule(WRITE_SCHEDULE_PERIOD);\r
-        }\r
-    }\r
-\r
-    private int countTasks() {\r
-        synchronized (tasks) {\r
-            return tasks.size();\r
-        }\r
-    }\r
-\r
-    private EventWriteTask[] pruneTasks() {\r
-        synchronized (this.tasks) {\r
-            EventWriteTask[] tasks = this.tasks.toArray(NONE);\r
-            this.tasks.clear();\r
-            return tasks;\r
-        }\r
-    }\r
-\r
+/*******************************************************************************
+ * Copyright (c) 2012 Association for Decentralized Information Management
+ * in Industry THTH ry.
+ * All rights reserved. This program and the accompanying materials
+ * are made available under the terms of the Eclipse Public License v1.0
+ * which accompanies this distribution, and is available at
+ * http://www.eclipse.org/legal/epl-v10.html
+ *
+ * Contributors:
+ *     Semantum Oy - initial API and implementation
+ *******************************************************************************/
+package org.simantics.event.writer;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.eclipse.core.runtime.IProgressMonitor;
+import org.eclipse.core.runtime.IStatus;
+import org.eclipse.core.runtime.Status;
+import org.simantics.DatabaseJob;
+import org.simantics.Simantics;
+import org.simantics.db.ReadGraph;
+import org.simantics.db.Resource;
+import org.simantics.db.Session;
+import org.simantics.db.VirtualGraph;
+import org.simantics.db.WriteOnlyGraph;
+import org.simantics.db.common.request.UniqueRead;
+import org.simantics.db.common.request.WriteOnlyRequest;
+import org.simantics.db.exception.CancelTransactionException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.request.WriteOnly;
+import org.simantics.event.Activator;
+import org.simantics.event.util.EventUtils;
+import org.simantics.event.util.EventWriteData;
+
+/**
+ * A self-scheduling job that queues up {@link EventWriteTask} instances and
+ * periodically executes the queued up tasks in a single graph {@link WriteOnly}
+ * request.
+ * 
+ * <p>
+ * Intended to be constructed and scheduled by experiment implementations.
+ * 
+ * <p>
+ * Must be disposed when no longer used.
+ * 
+ * @author Tuukka Lehtonen
+ * 
+ * @see EventWriteTask
+ * @see EventSourceResolver
+ */
+public class EventWriterJob extends DatabaseJob {
+
+    private final boolean                 DEBUG                 = false;
+
+    private static final int              WRITE_SCHEDULE_PERIOD = 1000;
+
+    private static final long             MIN_QUEUE_QUIET_TIME  = 1000L;
+
+    private static final long             MIN_WRITE_QUIET_TIME  = 1000L;
+
+    private static final int              MAX_TASK_QUEUE_SIZE   = 500;
+
+    private static final EventWriteTask[] NONE                  = {};
+
+    private List<EventWriteTask>          tasks                 = new ArrayList<EventWriteTask>();
+
+    private VirtualGraph                  virtualGraph;
+    private Resource                      eventLog;
+    private EventSourceResolver           resolver;
+
+    private AtomicBoolean                 scheduled             = new AtomicBoolean(false);
+    private AtomicBoolean                 polling               = new AtomicBoolean(true);
+
+    private long                          lastWriteTime         = Long.MIN_VALUE;
+    private long                          lastQueueTime         = Long.MIN_VALUE;
+
+    public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) {
+        super("Event Writer");
+        setPriority(DECORATE);
+        setUser(false);
+        setSystem(false);
+        this.virtualGraph = virtualGraph;
+        this.eventLog = eventLog;
+        this.resolver = resolver;
+    }
+
+    public void dispose() {
+        if (DEBUG)
+            System.out.println(this + ": DISPOSE");
+        polling.set(false);
+    }
+
+    public void queue(EventWriteTask task) {
+        synchronized (tasks) {
+            tasks.add(task);
+            this.lastQueueTime = System.currentTimeMillis();
+
+            if (scheduled.compareAndSet(false, true)) {
+                schedule(WRITE_SCHEDULE_PERIOD);
+            }
+        }
+    }
+
+    @Override
+    public boolean shouldRun() {
+        return polling.get() && Simantics.peekSession() != null;
+    }
+
+    @Override
+    protected IStatus run(final IProgressMonitor monitor) {
+        try {
+            Session session = Simantics.peekSession();
+            if (session == null)
+                return Status.CANCEL_STATUS;
+
+            int taskCount = countTasks();
+            if (taskCount == 0)
+                return Status.CANCEL_STATUS;
+
+            long currentTime = System.currentTimeMillis();
+
+            // Must flush write at some point if no flush has been performed yet.
+            if (lastWriteTime == Long.MIN_VALUE)
+                lastWriteTime = currentTime;
+
+            long queueQuietTime = currentTime - lastQueueTime;
+            long writeQuietTime = currentTime - lastWriteTime;
+            boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME;
+            boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME;
+            boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE;
+
+            if (DEBUG) {
+                System.out.println("Queue quiet time: " + queueQuietTime);
+                System.out.println("Write quiet time: " + writeQuietTime);
+                System.out.println("Task count: " + taskCount);
+            }
+
+            if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) {
+                return Status.CANCEL_STATUS;
+            }
+
+            if (DEBUG) {
+                if (queueQuietTimePassed)
+                    System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime);
+                if (writeQuietTimePassed)
+                    System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime);
+                if (taskCountExceeded)
+                    System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount);
+            }
+
+            final EventWriteTask[] tasks = pruneTasks();
+            if (tasks.length == 0)
+                return Status.CANCEL_STATUS;
+
+            if (DEBUG)
+                System.out.println(this + ": about to write " + tasks.length + " events");
+            setThread(Thread.currentThread());
+
+            monitor.beginTask("Flush Events", tasks.length);
+            
+            try {
+               
+               final EventWriteData writeData = session.syncRequest(new UniqueRead<EventWriteData>() {
+
+                       @Override
+                       public EventWriteData perform(ReadGraph graph) throws DatabaseException {
+                               return new EventWriteData(graph, eventLog, virtualGraph);
+                       }
+                       
+               });     
+
+                session.sync(new WriteOnlyRequest(virtualGraph) {
+                    @Override
+                    public void perform(WriteOnlyGraph graph) throws DatabaseException {
+                       
+                        setThread(Thread.currentThread());
+                        List<Resource> events = new ArrayList<Resource>(tasks.length);
+                        for (EventWriteTask task : tasks) {
+
+                               writeData.prepareToWrite(graph);
+                            
+                            Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos);
+                            if (event != null) {
+                                events.add(event);
+                                writeData.written();
+                            }
+                            monitor.worked(1);
+                            
+                        }
+                        
+                        writeData.commit(graph);
+                        
+                        if (resolver != null) {
+                            if (DEBUG)
+                                System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution");
+                            resolver.queueEvents(events);
+                        }
+                    }
+                });
+                lastWriteTime = System.currentTimeMillis();
+            } catch (CancelTransactionException e) {
+                polling.set(false);
+                return Status.CANCEL_STATUS;
+            } catch (DatabaseException e) {
+                return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e);
+            }
+
+            return Status.OK_STATUS;
+        } finally {
+            monitor.done();
+            schedule(WRITE_SCHEDULE_PERIOD);
+        }
+    }
+
+    private int countTasks() {
+        synchronized (tasks) {
+            return tasks.size();
+        }
+    }
+
+    private EventWriteTask[] pruneTasks() {
+        synchronized (this.tasks) {
+            EventWriteTask[] tasks = this.tasks.toArray(NONE);
+            this.tasks.clear();
+            return tasks;
+        }
+    }
+
 }
\ No newline at end of file