ec4d56c211ad54640d6d6f429d8847e8d6bf7760
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
1 package org.simantics.acorn;
2
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.Comparator;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
16
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class MainProgram implements Runnable, Closeable {
27
28         private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
29
30         private static final int CLUSTER_THREADS = 4;
31         private static final int CHUNK_CACHE_SIZE = 100;
32
33         private final GraphClientImpl2 client;
34         private final ExecutorService[] clusterUpdateThreads;
35     private final List<ClusterUpdateOperation>[] updateSchedules;
36
37         private Thread mainProgramThread;
38
39         private boolean alive = true;
40         private Semaphore deathBarrier = new Semaphore(0);
41
42         final ClusterManager clusters;
43
44         private final OperationQueue operationQueue = new OperationQueue(this);
45
46         static class ClusterThreadFactory implements ThreadFactory {
47
48                 final String name;
49                 final boolean daemon;
50
51                 public ClusterThreadFactory(String name, boolean daemon) {
52                         this.name = name;
53                         this.daemon = daemon;
54                 }
55
56                 @Override
57                 public Thread newThread(Runnable r) {
58                         Thread thread = new Thread(r, name);
59                         thread.setDaemon(daemon);
60                         return thread;
61                 }
62         }
63
64         @SuppressWarnings("unchecked")
65         MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
66
67                 this.client = client;
68                 this.clusters = clusters;
69                 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
70                 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
71                 for(int i=0;i<clusterUpdateThreads.length;i++) {
72                         clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
73                         updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
74                 }
75         }
76
77         void startTransaction(long id) {
78                 operationQueue.startTransaction(id);
79         }
80
81         private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
82
83                 @Override
84                 public int compare(ClusterUID o1, ClusterUID o2) {
85                         return Long.compare(o1.second, o2.second);
86                 }
87         };
88
89         @Override
90         public void run() {
91
92                 mainProgramThread = Thread.currentThread();
93
94                 try {
95
96                         main:
97                         while(alive) {
98
99                                 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
100
101                                 operationQueue.pumpUpdates(updates);
102
103                                 if(updates.isEmpty()) {
104
105                                         long start = System.nanoTime();
106
107                                         operationQueue.waitFor();
108
109                                         if (!alive)
110                                                 break main;
111
112                                         long duration = System.nanoTime()-start;
113                                         if(duration > 4000000000L) {
114                                                 checkIdle();
115                                         }
116                                 }
117
118 //                              long sss = System.nanoTime();
119
120                                 runUpdates(updates);
121                                 runTasksIfEmpty();
122
123                                 /*
124                                  * Here we are actively processing updates from client.
125                                  * Maintain necessary caching here.
126                                  */
127
128                                 clusters.streamLRU.acquireMutex();
129                                 try {
130                                         swapChunks();
131                                 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
132                                         LOGGER.error("cluster chunk swapping failed", e);
133                                 } finally {
134                                         clusters.streamLRU.releaseMutex();
135                                 }
136                                 clusters.csLRU.acquireMutex();
137                                 try {
138                                         swapCS();
139                                 } catch (Throwable t) {
140                                         throw new IllegalAcornStateException(t);
141                                 } finally {
142                                         clusters.csLRU.releaseMutex();
143                                 }
144
145                                 TimeLogger.log("Performed updates");
146
147                         }
148
149                 } catch (Throwable t) {
150                         LOGGER.error("FATAL: MainProgram died unexpectedly", t);
151                 } finally {
152                         deathBarrier.release();
153                 }
154         }
155
156         @FunctionalInterface
157         static interface MainProgramRunnable {
158                 void run() throws Exception;
159                 default void error(Exception e) {
160                         LOGGER.error("An error occured", e);
161                 }
162                 default void success() {}
163         }
164
165         private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
166
167                 for(int i=0;i<CLUSTER_THREADS;i++)
168                         updateSchedules[i].clear();
169
170                 final Semaphore s = new Semaphore(0);
171
172                 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
173                         ClusterUID key = entry.getKey();
174                         int hash = key.hashCode() & (clusterUpdateThreads.length-1);
175                         updateSchedules[hash].addAll(entry.getValue());
176                 }
177
178                 //                              final AtomicLong elapsed = new AtomicLong(0);
179                 int acquireAmount = 0;
180                 for(int i=0;i<CLUSTER_THREADS;i++) {
181                         final List<ClusterUpdateOperation> ops = updateSchedules[i];
182                         if (!ops.isEmpty()) {
183                                 acquireAmount++;
184                                 clusterUpdateThreads[i].submit(new Callable<Object>() {
185
186                     @Override
187                     public Object call() throws Exception {
188                         //long st = System.nanoTime();
189                         try {
190                             for(ClusterUpdateOperation op : ops) {
191                                 op.run();
192                             }
193                         } finally {
194                             s.release();
195                         }
196                         return null;
197
198 //                          long duration = System.nanoTime()-st;
199 //                          elapsed.addAndGet(duration);
200 //                          double dur = 1e-9*duration;
201 //                          if(dur > 0.05)
202 //                              System.err.println("duration=" + dur + "s. " + ops.size());
203                     }
204                 });
205                         }
206                 }
207
208                 s.acquire(acquireAmount);
209
210         }
211
212         /*
213          * This shall be run when no updates are currently available.
214          */
215         private void runTasksIfEmpty() {
216                 if(operationQueue.isEmpty()) {
217                         List<MainProgramRunnable> todo = new ArrayList<>();
218                         operationQueue.pumpTasks(todo);
219                         for(MainProgramRunnable runnable : todo) {
220                                 try {
221                                         runnable.run();
222                                         runnable.success();
223                                 } catch (Exception e) {
224                                         runnable.error(e);
225                                 }
226                         }
227                 }
228         }
229
230         /*
231          * This gets called when an idle period has been detected
232          */
233         private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
234
235                 // Was this a time-out or a new stream request?
236                 if(operationQueue.isEmpty()) {
237
238                         /*
239                          * We are idling here.
240                          * Flush all caches gradually
241                          */
242
243                         // Write pending cs to disk
244                         boolean written = clusters.csLRU.swapForced();
245                         while(written) {
246                                 if(!operationQueue.isEmpty()) break;
247                                 written = clusters.csLRU.swapForced();
248                         }
249                         // Write pending chunks to disk
250                         written = clusters.streamLRU.swapForced();
251                         while(written) {
252                                 if(!operationQueue.isEmpty()) break;
253                                 written = clusters.streamLRU.swapForced();
254                         }
255                         // Write pending files to disk
256                         written = clusters.fileLRU.swapForced();
257                         while(written) {
258                                 if(!operationQueue.isEmpty()) break;
259                                 written = clusters.fileLRU.swapForced();
260                         }
261                         // Write pending clusters to disk
262                         written = clusters.clusterLRU.swapForced();
263                         while(written) {
264                                 if(!operationQueue.isEmpty()) break;
265                                 written = clusters.clusterLRU.swapForced();
266                         }
267
268                         client.tryMakeSnapshot();
269
270                 }
271
272         }
273
274
275         /*
276          * This schedules tasks to be run in MainProgram thread
277          * Called from other threads than MainProgram thread
278          *
279          */
280         void runIdle(MainProgramRunnable task) {
281                 operationQueue.scheduleTask(task);
282         }
283
284         /*
285          * Mutex for streamLRU is assumed here
286          *
287          */
288         private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
289
290                 // Cache chunks during update operations
291                 while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
292         }
293
294         private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
295
296                 // Cache chunks during update operations
297                 while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
298         }
299
300         /*
301          * Called by DB client write threads
302          */
303         void committed() {
304
305                 ClusterStreamChunk last = operationQueue.commitLast();
306         if (!alive) {
307             LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
308         }
309
310         }
311
312         /*
313          * Called by DB client write threads
314          */
315         void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
316
317                 if (!alive) {
318                 LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
319             }
320
321                 clusters.streamLRU.acquireMutex();
322
323                 try {
324
325                         operationQueue.scheduleUpdate(operation);
326                         swapChunks();
327
328                 } catch (IllegalAcornStateException e) {
329                     throw e;
330                 } catch (Throwable t) {
331                         throw new IllegalAcornStateException(t);
332                 } finally {
333                         clusters.streamLRU.releaseMutex();
334                 }
335
336         }
337
338     @Override
339     public void close() {
340
341         alive = false;
342
343         // This will wake up the sleeping beauty
344         operationQueue.scheduleTask(() -> {});
345
346         try {
347             deathBarrier.acquire();
348         } catch (InterruptedException e) {
349         }
350
351         for (ExecutorService executor : clusterUpdateThreads)
352             executor.shutdown();
353
354         for (int i = 0; i < clusterUpdateThreads.length; i++) {
355             try {
356                 ExecutorService executor  = clusterUpdateThreads[i];
357                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
358                 clusterUpdateThreads[i] = null;
359             } catch (InterruptedException e) {
360                 LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e);
361             }
362         }
363     }
364
365     void assertMainProgramThread() {
366         assert(Thread.currentThread().equals(mainProgramThread));
367     }
368
369     void assertNoMainProgramThread() {
370         assert(!Thread.currentThread().equals(mainProgramThread));
371     }
372
373 }