]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
Avoid useless reallocation of empty TreeMaps and map iterators
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
1 package org.simantics.acorn;
2
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.Comparator;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
16
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class MainProgram implements Runnable, Closeable {
27
28         private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
29
30         private static final int CLUSTER_THREADS = 4;
31         private static final int CHUNK_CACHE_SIZE = 100;
32
33         private final GraphClientImpl2 client;
34         private final ExecutorService[] clusterUpdateThreads;
35     private final List<ClusterUpdateOperation>[] updateSchedules;
36
37         private Thread mainProgramThread;
38
39         private boolean alive = true;
40         private Semaphore deathBarrier = new Semaphore(0);
41
42         final ClusterManager clusters;
43
44         private final OperationQueue operationQueue = new OperationQueue(this);
45
46         static class ClusterThreadFactory implements ThreadFactory {
47
48                 final String name;
49                 final boolean daemon;
50
51                 public ClusterThreadFactory(String name, boolean daemon) {
52                         this.name = name;
53                         this.daemon = daemon;
54                 }
55
56                 @Override
57                 public Thread newThread(Runnable r) {
58                         Thread thread = new Thread(r, name);
59                         thread.setDaemon(daemon);
60                         return thread;
61                 }
62         }
63
64         @SuppressWarnings("unchecked")
65         MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
66
67                 this.client = client;
68                 this.clusters = clusters;
69                 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
70                 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
71                 for(int i=0;i<clusterUpdateThreads.length;i++) {
72                         clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
73                         updateSchedules[i] = new ArrayList<>();
74                 }
75         }
76
77         void startTransaction(long id) {
78                 operationQueue.startTransaction(id);
79         }
80
81         private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
82
83                 @Override
84                 public int compare(ClusterUID o1, ClusterUID o2) {
85                         return Long.compare(o1.second, o2.second);
86                 }
87         };
88
89         @Override
90         public void run() {
91
92                 mainProgramThread = Thread.currentThread();
93
94                 try {
95
96                         TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<>(clusterComparator);
97
98                         main:
99                         while(alive) {
100
101                                 if (!updates.isEmpty())
102                                         updates.clear();
103                                 operationQueue.pumpUpdates(updates);
104
105                                 if(updates.isEmpty()) {
106
107                                         long start = System.nanoTime();
108
109                                         operationQueue.waitFor();
110
111                                         if (!alive)
112                                                 break main;
113
114                                         long duration = System.nanoTime()-start;
115                                         if(duration > 4000000000L) {
116                                                 checkIdle();
117                                         }
118                                 }
119
120 //                              long sss = System.nanoTime();
121
122                                 runUpdates(updates);
123                                 runTasksIfEmpty();
124
125                                 /*
126                                  * Here we are actively processing updates from client.
127                                  * Maintain necessary caching here.
128                                  */
129
130                                 clusters.streamLRU.acquireMutex();
131                                 try {
132                                         swapChunks();
133                                 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
134                                         LOGGER.error("cluster chunk swapping failed", e);
135                                 } finally {
136                                         clusters.streamLRU.releaseMutex();
137                                 }
138                                 clusters.csLRU.acquireMutex();
139                                 try {
140                                         swapCS();
141                                 } catch (Throwable t) {
142                                         throw new IllegalAcornStateException(t);
143                                 } finally {
144                                         clusters.csLRU.releaseMutex();
145                                 }
146
147                                 TimeLogger.log("Performed updates");
148
149                         }
150
151                 } catch (Throwable t) {
152                         LOGGER.error("FATAL: MainProgram died unexpectedly", t);
153                 } finally {
154                         deathBarrier.release();
155                 }
156         }
157
158         @FunctionalInterface
159         static interface MainProgramRunnable {
160                 void run() throws Exception;
161                 default void error(Exception e) {
162                         LOGGER.error("An error occured", e);
163                 }
164                 default void success() {}
165         }
166
167         private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
168
169                 for(int i=0;i<CLUSTER_THREADS;i++)
170                         updateSchedules[i].clear();
171
172                 if (updates.isEmpty())
173                         return;
174
175                 final Semaphore s = new Semaphore(0);
176
177                 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
178                         ClusterUID key = entry.getKey();
179                         int hash = key.hashCode() & (clusterUpdateThreads.length-1);
180                         updateSchedules[hash].addAll(entry.getValue());
181                 }
182
183                 //                              final AtomicLong elapsed = new AtomicLong(0);
184                 int acquireAmount = 0;
185                 for(int i=0;i<CLUSTER_THREADS;i++) {
186                         final List<ClusterUpdateOperation> ops = updateSchedules[i];
187                         if (!ops.isEmpty()) {
188                                 acquireAmount++;
189                                 clusterUpdateThreads[i].submit(new Callable<Object>() {
190
191                     @Override
192                     public Object call() throws Exception {
193                         //long st = System.nanoTime();
194                         try {
195                             for(ClusterUpdateOperation op : ops) {
196                                 op.run();
197                             }
198                         } finally {
199                             s.release();
200                         }
201                         return null;
202
203 //                          long duration = System.nanoTime()-st;
204 //                          elapsed.addAndGet(duration);
205 //                          double dur = 1e-9*duration;
206 //                          if(dur > 0.05)
207 //                              System.err.println("duration=" + dur + "s. " + ops.size());
208                     }
209                 });
210                         }
211                 }
212
213                 s.acquire(acquireAmount);
214
215         }
216
217         /*
218          * This shall be run when no updates are currently available.
219          */
220         private void runTasksIfEmpty() {
221                 if(operationQueue.isEmpty()) {
222                         List<MainProgramRunnable> todo = new ArrayList<>();
223                         operationQueue.pumpTasks(todo);
224                         for(MainProgramRunnable runnable : todo) {
225                                 try {
226                                         runnable.run();
227                                         runnable.success();
228                                 } catch (Exception e) {
229                                         runnable.error(e);
230                                 }
231                         }
232                 }
233         }
234
235         /*
236          * This gets called when an idle period has been detected
237          */
238         private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
239
240                 // Was this a time-out or a new stream request?
241                 if(operationQueue.isEmpty()) {
242
243                         /*
244                          * We are idling here.
245                          * Flush all caches gradually
246                          */
247
248                         // Write pending cs to disk
249                         boolean written = clusters.csLRU.swapForced();
250                         while(written) {
251                                 if(!operationQueue.isEmpty()) break;
252                                 written = clusters.csLRU.swapForced();
253                         }
254                         // Write pending chunks to disk
255                         written = clusters.streamLRU.swapForced();
256                         while(written) {
257                                 if(!operationQueue.isEmpty()) break;
258                                 written = clusters.streamLRU.swapForced();
259                         }
260                         // Write pending files to disk
261                         written = clusters.fileLRU.swapForced();
262                         while(written) {
263                                 if(!operationQueue.isEmpty()) break;
264                                 written = clusters.fileLRU.swapForced();
265                         }
266                         // Write pending clusters to disk
267                         written = clusters.clusterLRU.swapForced();
268                         while(written) {
269                                 if(!operationQueue.isEmpty()) break;
270                                 written = clusters.clusterLRU.swapForced();
271                         }
272
273                         client.tryMakeSnapshot();
274
275                 }
276
277         }
278
279
280         /*
281          * This schedules tasks to be run in MainProgram thread
282          * Called from other threads than MainProgram thread
283          *
284          */
285         void runIdle(MainProgramRunnable task) {
286                 operationQueue.scheduleTask(task);
287         }
288
289         /*
290          * Mutex for streamLRU is assumed here
291          *
292          */
293         private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
294
295                 // Cache chunks during update operations
296                 while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
297         }
298
299         private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
300
301                 // Cache chunks during update operations
302                 while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
303         }
304
305         /*
306          * Called by DB client write threads
307          */
308         void committed() {
309
310                 ClusterStreamChunk last = operationQueue.commitLast();
311         if (!alive) {
312             LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
313         }
314
315         }
316
317         /*
318          * Called by DB client write threads
319          */
320         void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
321
322                 if (!alive) {
323                 LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
324             }
325
326                 clusters.streamLRU.acquireMutex();
327
328                 try {
329
330                         operationQueue.scheduleUpdate(operation);
331                         swapChunks();
332
333                 } catch (IllegalAcornStateException e) {
334                     throw e;
335                 } catch (Throwable t) {
336                         throw new IllegalAcornStateException(t);
337                 } finally {
338                         clusters.streamLRU.releaseMutex();
339                 }
340
341         }
342
343     @Override
344     public void close() {
345
346         alive = false;
347
348         // This will wake up the sleeping beauty
349         operationQueue.scheduleTask(() -> {});
350
351         try {
352             deathBarrier.acquire();
353         } catch (InterruptedException e) {
354         }
355
356         for (ExecutorService executor : clusterUpdateThreads)
357             executor.shutdown();
358
359         for (int i = 0; i < clusterUpdateThreads.length; i++) {
360             try {
361                 ExecutorService executor  = clusterUpdateThreads[i];
362                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
363                 clusterUpdateThreads[i] = null;
364             } catch (InterruptedException e) {
365                 LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e);
366             }
367         }
368     }
369
370     void assertMainProgramThread() {
371         assert(Thread.currentThread().equals(mainProgramThread));
372     }
373
374     void assertNoMainProgramThread() {
375         assert(!Thread.currentThread().equals(mainProgramThread));
376     }
377
378 }