]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
Fixed Acorn deadlock during snapshotting
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
1 package org.simantics.acorn;
2
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.Comparator;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
16
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class MainProgram implements Runnable, Closeable {
27
28         private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
29
30         private static final int CLUSTER_THREADS = 4;
31         private static final int CHUNK_CACHE_SIZE = 100;
32
33         private final GraphClientImpl2 client;
34         private final ExecutorService[] clusterUpdateThreads;
35     private final List<ClusterUpdateOperation>[] updateSchedules;
36
37         private Thread mainProgramThread;
38
39         private boolean alive = true;
40         private Semaphore deathBarrier = new Semaphore(0);
41
42         final ClusterManager clusters;
43
44         private final OperationQueue operationQueue = new OperationQueue(this);
45
46         static class ClusterThreadFactory implements ThreadFactory {
47
48                 final String name;
49                 final boolean daemon;
50
51                 public ClusterThreadFactory(String name, boolean daemon) {
52                         this.name = name;
53                         this.daemon = daemon;
54                 }
55
56                 @Override
57                 public Thread newThread(Runnable r) {
58                         Thread thread = new Thread(r, name);
59                         thread.setDaemon(daemon);
60                         return thread;
61                 }
62         }
63
64         MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
65
66                 this.client = client;
67                 this.clusters = clusters;
68                 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
69                 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
70                 for(int i=0;i<clusterUpdateThreads.length;i++) {
71                         clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
72                         updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
73                 }
74         }
75
76         void startTransaction(long id) {
77                 operationQueue.startTransaction(id);
78         }
79
80         private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
81
82                 @Override
83                 public int compare(ClusterUID o1, ClusterUID o2) {
84                         return Long.compare(o1.second, o2.second);
85                 }
86         };
87
88         @Override
89         public void run() {
90
91                 mainProgramThread = Thread.currentThread();
92
93                 try {
94
95                         main:
96                         while(alive) {
97
98                                 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
99
100                                 operationQueue.pumpUpdates(updates);
101
102                                 if(updates.isEmpty()) {
103
104                                         long start = System.nanoTime();
105
106                                         operationQueue.waitFor();
107
108                                         if (!alive)
109                                                 break main;
110
111                                         long duration = System.nanoTime()-start;
112                                         if(duration > 4000000000L) {
113                                                 checkIdle();
114                                         }
115                                 }
116
117 //                              long sss = System.nanoTime();
118
119                                 runUpdates(updates);
120                                 runTasksIfEmpty();
121
122                                 /*
123                                  * Here we are actively processing updates from client.
124                                  * Maintain necessary caching here.
125                                  */
126
127                                 clusters.streamLRU.acquireMutex();
128                                 try {
129                                         swapChunks();
130                                 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
131                                     e.printStackTrace();
132                                 } finally {
133                                         clusters.streamLRU.releaseMutex();
134                                 }
135                                 clusters.csLRU.acquireMutex();
136                                 try {
137                                         swapCS();
138                                 } catch (Throwable t) {
139                                         throw new IllegalAcornStateException(t);
140                                 } finally {
141                                         clusters.csLRU.releaseMutex();
142                                 }
143
144                                 TimeLogger.log("Performed updates");
145
146                         }
147
148                 } catch (Throwable t) {
149                         t.printStackTrace();
150                 } finally {
151                         deathBarrier.release();
152                 }
153         }
154
155         @FunctionalInterface
156         static interface MainProgramRunnable {
157                 void run() throws Exception;
158                 default void error(Exception e) {
159                         LOGGER.error("An error occured", e);
160                 }
161                 default void success() {}
162         }
163
164         private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
165
166                 for(int i=0;i<CLUSTER_THREADS;i++)
167                         updateSchedules[i].clear();
168
169                 final Semaphore s = new Semaphore(0);
170
171                 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
172                         ClusterUID key = entry.getKey();
173                         int hash = key.hashCode() & (clusterUpdateThreads.length-1);
174                         updateSchedules[hash].addAll(entry.getValue());
175                 }
176
177                 //                              final AtomicLong elapsed = new AtomicLong(0);
178                 int acquireAmount = 0;
179                 for(int i=0;i<CLUSTER_THREADS;i++) {
180                         final List<ClusterUpdateOperation> ops = updateSchedules[i];
181                         if (!ops.isEmpty()) {
182                                 acquireAmount++;
183                                 clusterUpdateThreads[i].submit(new Callable<Object>() {
184
185                     @Override
186                     public Object call() throws Exception {
187                         //long st = System.nanoTime();
188                         try {
189                             for(ClusterUpdateOperation op : ops) {
190                                 op.run();
191                             }
192                         } finally {
193                             s.release();
194                         }
195                         return null;
196
197 //                          long duration = System.nanoTime()-st;
198 //                          elapsed.addAndGet(duration);
199 //                          double dur = 1e-9*duration;
200 //                          if(dur > 0.05)
201 //                              System.err.println("duration=" + dur + "s. " + ops.size());
202                     }
203                 });
204                         }
205                 }
206
207                 s.acquire(acquireAmount);
208
209         }
210
211         /*
212          * This shall be run when no updates are currently available.
213          */
214         private void runTasksIfEmpty() {
215                 if(operationQueue.isEmpty()) {
216                         List<MainProgramRunnable> todo = new ArrayList<>();
217                         operationQueue.pumpTasks(todo);
218                         for(MainProgramRunnable runnable : todo) {
219                                 try {
220                                         runnable.run();
221                                         runnable.success();
222                                 } catch (Exception e) {
223                                         runnable.error(e);
224                                 }
225                         }
226                 }
227         }
228
229         /*
230          * This gets called when an idle period has been detected
231          */
232         private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
233
234                 // Was this a time-out or a new stream request?
235                 if(operationQueue.isEmpty()) {
236
237                         /*
238                          * We are idling here.
239                          * Flush all caches gradually
240                          */
241
242                         // Write pending cs to disk
243                         boolean written = clusters.csLRU.swapForced();
244                         while(written) {
245                                 if(!operationQueue.isEmpty()) break;
246                                 written = clusters.csLRU.swapForced();
247                         }
248                         // Write pending chunks to disk
249                         written = clusters.streamLRU.swapForced();
250                         while(written) {
251                                 if(!operationQueue.isEmpty()) break;
252                                 written = clusters.streamLRU.swapForced();
253                         }
254                         // Write pending files to disk
255                         written = clusters.fileLRU.swapForced();
256                         while(written) {
257                                 if(!operationQueue.isEmpty()) break;
258                                 written = clusters.fileLRU.swapForced();
259                         }
260                         // Write pending clusters to disk
261                         written = clusters.clusterLRU.swapForced();
262                         while(written) {
263                                 if(!operationQueue.isEmpty()) break;
264                                 written = clusters.clusterLRU.swapForced();
265                         }
266
267                         client.tryMakeSnapshot();
268
269                 }
270
271         }
272
273
274         /*
275          * This schedules tasks to be run in MainProgram thread
276          * Called from other threads than MainProgram thread
277          *
278          */
279         void runIdle(MainProgramRunnable task) {
280                 operationQueue.scheduleTask(task);
281         }
282
283         /*
284          * Mutex for streamLRU is assumed here
285          *
286          */
287         private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
288
289                 // Cache chunks during update operations
290                 while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
291         }
292
293         private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
294
295                 // Cache chunks during update operations
296                 while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
297         }
298
299         /*
300          * Called by DB client write threads
301          */
302         void committed() {
303
304                 ClusterStreamChunk last = operationQueue.commitLast();
305         if (!alive) {
306             LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
307         }
308
309         }
310
311         /*
312          * Called by DB client write threads
313          */
314         void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
315
316                 if (!alive) {
317                 LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
318             }
319
320                 clusters.streamLRU.acquireMutex();
321
322                 try {
323
324                         operationQueue.scheduleUpdate(operation);
325                         swapChunks();
326
327                 } catch (IllegalAcornStateException e) {
328                     throw e;
329                 } catch (Throwable t) {
330                         throw new IllegalAcornStateException(t);
331                 } finally {
332                         clusters.streamLRU.releaseMutex();
333                 }
334
335         }
336
337     @Override
338     public void close() {
339
340         alive = false;
341
342         // This will wake up the sleeping beauty
343         operationQueue.scheduleTask(() -> {});
344
345         try {
346             deathBarrier.acquire();
347         } catch (InterruptedException e) {
348         }
349
350         for (ExecutorService executor : clusterUpdateThreads)
351             executor.shutdown();
352
353         for (int i = 0; i < clusterUpdateThreads.length; i++) {
354             try {
355                 ExecutorService executor  = clusterUpdateThreads[i];
356                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
357                 clusterUpdateThreads[i] = null;
358             } catch (InterruptedException e) {
359                 e.printStackTrace();
360             }
361         }
362     }
363
364     void assertMainProgramThread() {
365         assert(Thread.currentThread().equals(mainProgramThread));
366     }
367
368     void assertNoMainProgramThread() {
369         assert(!Thread.currentThread().equals(mainProgramThread));
370     }
371
372 }