]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.event/src/org/simantics/event/writer/EventWriterJob.java
Migrated source code from Simantics SVN
[simantics/platform.git] / bundles / org.simantics.event / src / org / simantics / event / writer / EventWriterJob.java
1 /*******************************************************************************\r
2  * Copyright (c) 2012 Association for Decentralized Information Management\r
3  * in Industry THTH ry.\r
4  * All rights reserved. This program and the accompanying materials\r
5  * are made available under the terms of the Eclipse Public License v1.0\r
6  * which accompanies this distribution, and is available at\r
7  * http://www.eclipse.org/legal/epl-v10.html\r
8  *\r
9  * Contributors:\r
10  *     Semantum Oy - initial API and implementation\r
11  *******************************************************************************/\r
12 package org.simantics.event.writer;\r
13 \r
14 import java.util.ArrayList;\r
15 import java.util.List;\r
16 import java.util.concurrent.atomic.AtomicBoolean;\r
17 \r
18 import org.eclipse.core.runtime.IProgressMonitor;\r
19 import org.eclipse.core.runtime.IStatus;\r
20 import org.eclipse.core.runtime.Status;\r
21 import org.simantics.DatabaseJob;\r
22 import org.simantics.Simantics;\r
23 import org.simantics.db.ReadGraph;\r
24 import org.simantics.db.Resource;\r
25 import org.simantics.db.Session;\r
26 import org.simantics.db.VirtualGraph;\r
27 import org.simantics.db.WriteOnlyGraph;\r
28 import org.simantics.db.common.request.UniqueRead;\r
29 import org.simantics.db.common.request.WriteOnlyRequest;\r
30 import org.simantics.db.exception.CancelTransactionException;\r
31 import org.simantics.db.exception.DatabaseException;\r
32 import org.simantics.db.request.WriteOnly;\r
33 import org.simantics.event.Activator;\r
34 import org.simantics.event.util.EventUtils;\r
35 import org.simantics.event.util.EventWriteData;\r
36 \r
37 /**\r
38  * A self-scheduling job that queues up {@link EventWriteTask} instances and\r
39  * periodically executes the queued up tasks in a single graph {@link WriteOnly}\r
40  * request.\r
41  * \r
42  * <p>\r
43  * Intended to be constructed and scheduled by experiment implementations.\r
44  * \r
45  * <p>\r
46  * Must be disposed when no longer used.\r
47  * \r
48  * @author Tuukka Lehtonen\r
49  * \r
50  * @see EventWriteTask\r
51  * @see EventSourceResolver\r
52  */\r
53 public class EventWriterJob extends DatabaseJob {\r
54 \r
55     private final boolean                 DEBUG                 = false;\r
56 \r
57     private static final int              WRITE_SCHEDULE_PERIOD = 1000;\r
58 \r
59     private static final long             MIN_QUEUE_QUIET_TIME  = 1000L;\r
60 \r
61     private static final long             MIN_WRITE_QUIET_TIME  = 1000L;\r
62 \r
63     private static final int              MAX_TASK_QUEUE_SIZE   = 500;\r
64 \r
65     private static final EventWriteTask[] NONE                  = {};\r
66 \r
67     private List<EventWriteTask>          tasks                 = new ArrayList<EventWriteTask>();\r
68 \r
69     private VirtualGraph                  virtualGraph;\r
70     private Resource                      eventLog;\r
71     private EventSourceResolver           resolver;\r
72 \r
73     private AtomicBoolean                 scheduled             = new AtomicBoolean(false);\r
74     private AtomicBoolean                 polling               = new AtomicBoolean(true);\r
75 \r
76     private long                          lastWriteTime         = Long.MIN_VALUE;\r
77     private long                          lastQueueTime         = Long.MIN_VALUE;\r
78 \r
79     public EventWriterJob(VirtualGraph virtualGraph, Resource eventLog, EventSourceResolver resolver) {\r
80         super("Event Writer");\r
81         setPriority(DECORATE);\r
82         setUser(false);\r
83         setSystem(false);\r
84         this.virtualGraph = virtualGraph;\r
85         this.eventLog = eventLog;\r
86         this.resolver = resolver;\r
87     }\r
88 \r
89     public void dispose() {\r
90         if (DEBUG)\r
91             System.out.println(this + ": DISPOSE");\r
92         polling.set(false);\r
93     }\r
94 \r
95     public void queue(EventWriteTask task) {\r
96         synchronized (tasks) {\r
97             tasks.add(task);\r
98             this.lastQueueTime = System.currentTimeMillis();\r
99 \r
100             if (scheduled.compareAndSet(false, true)) {\r
101                 schedule(WRITE_SCHEDULE_PERIOD);\r
102             }\r
103         }\r
104     }\r
105 \r
106     @Override\r
107     public boolean shouldRun() {\r
108         return polling.get() && Simantics.peekSession() != null;\r
109     }\r
110 \r
111     @Override\r
112     protected IStatus run(final IProgressMonitor monitor) {\r
113         try {\r
114             Session session = Simantics.peekSession();\r
115             if (session == null)\r
116                 return Status.CANCEL_STATUS;\r
117 \r
118             int taskCount = countTasks();\r
119             if (taskCount == 0)\r
120                 return Status.CANCEL_STATUS;\r
121 \r
122             long currentTime = System.currentTimeMillis();\r
123 \r
124             // Must flush write at some point if no flush has been performed yet.\r
125             if (lastWriteTime == Long.MIN_VALUE)\r
126                 lastWriteTime = currentTime;\r
127 \r
128             long queueQuietTime = currentTime - lastQueueTime;\r
129             long writeQuietTime = currentTime - lastWriteTime;\r
130             boolean queueQuietTimePassed = queueQuietTime >= MIN_QUEUE_QUIET_TIME;\r
131             boolean writeQuietTimePassed = writeQuietTime >= MIN_WRITE_QUIET_TIME;\r
132             boolean taskCountExceeded = taskCount >= MAX_TASK_QUEUE_SIZE;\r
133 \r
134             if (DEBUG) {\r
135                 System.out.println("Queue quiet time: " + queueQuietTime);\r
136                 System.out.println("Write quiet time: " + writeQuietTime);\r
137                 System.out.println("Task count: " + taskCount);\r
138             }\r
139 \r
140             if (!writeQuietTimePassed && !queueQuietTimePassed && !taskCountExceeded) {\r
141                 return Status.CANCEL_STATUS;\r
142             }\r
143 \r
144             if (DEBUG) {\r
145                 if (queueQuietTimePassed)\r
146                     System.out.println("Queue quiet time (" + MIN_QUEUE_QUIET_TIME + ") exceeded: " + queueQuietTime);\r
147                 if (writeQuietTimePassed)\r
148                     System.out.println("Write quiet time (" + MIN_WRITE_QUIET_TIME + ") exceeded: " + writeQuietTime);\r
149                 if (taskCountExceeded)\r
150                     System.out.println("Maximum queued task count (" + MAX_TASK_QUEUE_SIZE + ") exceeded: " + taskCount);\r
151             }\r
152 \r
153             final EventWriteTask[] tasks = pruneTasks();\r
154             if (tasks.length == 0)\r
155                 return Status.CANCEL_STATUS;\r
156 \r
157             if (DEBUG)\r
158                 System.out.println(this + ": about to write " + tasks.length + " events");\r
159             setThread(Thread.currentThread());\r
160 \r
161             monitor.beginTask("Flush Events", tasks.length);\r
162             \r
163             try {\r
164                 \r
165                 final EventWriteData writeData = session.syncRequest(new UniqueRead<EventWriteData>() {\r
166 \r
167                         @Override\r
168                         public EventWriteData perform(ReadGraph graph) throws DatabaseException {\r
169                                 return new EventWriteData(graph, eventLog, virtualGraph);\r
170                         }\r
171                         \r
172                 });     \r
173 \r
174                 session.sync(new WriteOnlyRequest(virtualGraph) {\r
175                     @Override\r
176                     public void perform(WriteOnlyGraph graph) throws DatabaseException {\r
177                         \r
178                         setThread(Thread.currentThread());\r
179                         List<Resource> events = new ArrayList<Resource>(tasks.length);\r
180                         for (EventWriteTask task : tasks) {\r
181 \r
182                                 writeData.prepareToWrite(graph);\r
183                             \r
184                             Resource event = task.write(graph, writeData.targetSlice, writeData.targetPos);\r
185                             if (event != null) {\r
186                                 events.add(event);\r
187                                 writeData.written();\r
188                             }\r
189                             monitor.worked(1);\r
190                             \r
191                         }\r
192                         \r
193                         writeData.commit(graph);\r
194                         \r
195                         if (resolver != null) {\r
196                             if (DEBUG)\r
197                                 System.out.println(EventWriterJob.this + ": queue " + events.size() + " for source resolution");\r
198                             resolver.queueEvents(events);\r
199                         }\r
200                     }\r
201                 });\r
202                 lastWriteTime = System.currentTimeMillis();\r
203             } catch (CancelTransactionException e) {\r
204                 polling.set(false);\r
205                 return Status.CANCEL_STATUS;\r
206             } catch (DatabaseException e) {\r
207                 return new Status(IStatus.ERROR, Activator.BUNDLE_ID, "Event writing failed", e);\r
208             }\r
209 \r
210             return Status.OK_STATUS;\r
211         } finally {\r
212             monitor.done();\r
213             schedule(WRITE_SCHEDULE_PERIOD);\r
214         }\r
215     }\r
216 \r
217     private int countTasks() {\r
218         synchronized (tasks) {\r
219             return tasks.size();\r
220         }\r
221     }\r
222 \r
223     private EventWriteTask[] pruneTasks() {\r
224         synchronized (this.tasks) {\r
225             EventWriteTask[] tasks = this.tasks.toArray(NONE);\r
226             this.tasks.clear();\r
227             return tasks;\r
228         }\r
229     }\r
230 \r
231 }