]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
Merge "Undo did not wait for pending cluster updates to finish."
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
1 package org.simantics.acorn;
2
3 import java.io.Closeable;
4 import java.util.ArrayList;
5 import java.util.Comparator;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
16
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23
24 public class MainProgram implements Runnable, Closeable {
25
26         private static final int CLUSTER_THREADS = 4;
27         private static final int CHUNK_CACHE_SIZE = 100;
28
29         private final GraphClientImpl2 client;
30         private final ClusterManager clusters;
31         private final ExecutorService[] clusterUpdateThreads;
32     private final List<ClusterUpdateOperation>[] updateSchedules;
33         
34         private int residentOperationBytes = 0;
35         private long currentChangeSetId = -1;
36         private int nextChunkId = 0;
37         private boolean alive = true;
38         private Semaphore deathBarrier = new Semaphore(0);
39
40         final Semaphore mutex = new Semaphore(1);
41         final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
42
43         static class ClusterThreadFactory implements ThreadFactory {
44
45                 final String name;
46                 final boolean daemon;
47
48                 public ClusterThreadFactory(String name, boolean daemon) {
49                         this.name = name;
50                         this.daemon = daemon;
51                 }
52
53                 @Override
54                 public Thread newThread(Runnable r) {
55                         Thread thread = new Thread(r, name);
56                         thread.setDaemon(daemon);
57                         return thread;
58                 }
59         }
60
61         public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
62
63                 this.client = client;
64                 this.clusters = clusters;
65                 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
66                 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
67                 for(int i=0;i<clusterUpdateThreads.length;i++) {
68                         clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
69                         updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
70                 }
71         }
72
73         public void startTransaction(long id) {
74                 currentChangeSetId = id;
75                 nextChunkId = 0;
76         }
77
78         private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
79
80                 @Override
81                 public int compare(ClusterUID o1, ClusterUID o2) {
82                         return Long.compare(o1.second, o2.second);
83                 }
84         };
85
86         @Override
87         public void run() {
88                 try {
89
90                         mutex.acquire();
91                         main:
92                         while(alive) {
93
94                                 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
95
96                                 synchronized(MainProgram.this) {
97
98                                         while(!operations.isEmpty() && updates.size() < 100) {
99
100                                                 ClusterStreamChunk chunk = operations.pollFirst();
101
102                                                 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
103                                                         ClusterUpdateOperation o = chunk.operations.get(i);
104                                                         ClusterUID uid = o.uid;
105                                                         List<ClusterUpdateOperation> ops = updates.get(uid);
106                                                         if(ops == null) {
107                                                                 ops = new ArrayList<ClusterUpdateOperation>();
108                                                                 updates.put(uid, ops);
109                                                         }
110                                                         ops.add(o);
111                                                 }
112
113                                                 chunk.nextToProcess = chunk.operations.size();
114
115                                                 if(!chunk.isCommitted()) {
116                                                         assert(operations.isEmpty());
117                                                         operations.add(chunk);
118                                                         break;
119                                                 }
120
121                                         }
122
123                                         if(updates.isEmpty()) {
124                                                 try {
125                                                         long start = System.nanoTime();
126                                                         mutex.release();
127                                                         MainProgram.this.wait(5000);
128                                                         mutex.acquire();
129                                                         if (!alive)
130                                                                 break main;
131                                                         long duration = System.nanoTime()-start;
132                                                         if(duration > 4000000000L) {
133
134                                                                 // Was this a time-out or a new stream request?
135                                                                 if(operations.isEmpty()) {
136
137                                                                         /*
138                                                                          * We are idling here.
139                                                                          * Flush all caches gradually
140                                                                          */
141
142                                                                         // Write pending cs to disk
143                                                                         boolean written = clusters.csLRU.swapForced();
144                                                                         while(written) {
145                                                                                 if(!updates.isEmpty()) break;
146                                                                                 written = clusters.csLRU.swapForced();
147                                                                         }
148                                                                         // Write pending chunks to disk
149                                                                         written = clusters.streamLRU.swapForced();
150                                                                         while(written) {
151                                                                                 if(!updates.isEmpty()) break;
152                                                                                 written = clusters.streamLRU.swapForced();
153                                                                         }
154                                                                         // Write pending files to disk
155                                                                         written = clusters.fileLRU.swapForced();
156                                                                         while(written) {
157                                                                                 if(!updates.isEmpty()) break;
158                                                                                 written = clusters.fileLRU.swapForced();
159                                                                         }
160                                                                         // Write pending clusters to disk
161                                                                         written = clusters.clusterLRU.swapForced();
162                                                                         while(written) {
163                                                                                 if(!updates.isEmpty()) break;
164                                                                                 written = clusters.clusterLRU.swapForced();
165                                                                         }
166
167                                                                         client.tryMakeSnapshot();
168                                                                 }
169                                                         }
170                                                 } catch (InterruptedException e) {
171                                                         e.printStackTrace();
172                                                 }
173                                         }
174                                 }
175
176 //                              long sss = System.nanoTime();
177
178                                 for(int i=0;i<CLUSTER_THREADS;i++)
179                                         updateSchedules[i].clear();
180
181                                 final Semaphore s = new Semaphore(0);
182
183                                 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
184                                         ClusterUID key = entry.getKey();
185                                         int hash = key.hashCode() & (clusterUpdateThreads.length-1);
186                                         updateSchedules[hash].addAll(entry.getValue());
187                                 }
188
189                                 //                              final AtomicLong elapsed = new AtomicLong(0);
190                                 int acquireAmount = 0;
191                                 for(int i=0;i<CLUSTER_THREADS;i++) {
192                                         final List<ClusterUpdateOperation> ops = updateSchedules[i];
193                                         if (!ops.isEmpty()) {
194                                                 acquireAmount++;
195                                                 clusterUpdateThreads[i].submit(new Callable<Object>() {
196
197                             @Override
198                             public Object call() throws Exception {
199                                 //long st = System.nanoTime();
200                                 try {
201                                     for(ClusterUpdateOperation op : ops) {
202                                         op.run();
203                                     }
204                                 } finally {
205                                     s.release();
206                                 }
207                                 return null;
208                                 
209     //                          long duration = System.nanoTime()-st;
210     //                          elapsed.addAndGet(duration);
211     //                          double dur = 1e-9*duration;
212     //                          if(dur > 0.05)
213     //                              System.err.println("duration=" + dur + "s. " + ops.size());
214                             }
215                         });
216                                         }
217                                 }
218
219                                 s.acquire(acquireAmount);
220
221                                 /*
222                                  * Here we are actively processing updates from client.
223                                  * Maintain necessary caching here.
224                                  */
225
226                                 clusters.streamLRU.acquireMutex();
227                                 try {
228                                         swapChunks();
229                                 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
230                                     e.printStackTrace();
231                                 } finally {
232                                         clusters.streamLRU.releaseMutex();
233                                 }
234                                 clusters.csLRU.acquireMutex();
235                                 try {
236                                         swapCS();
237                                 } catch (Throwable t) {
238                                         throw new IllegalAcornStateException(t);
239                                 } finally {
240                                         clusters.csLRU.releaseMutex();
241                                 }
242
243                                 TimeLogger.log("Performed updates");
244
245                         }
246
247                 } catch (Throwable t) {
248                         t.printStackTrace();
249                 } finally {
250                         deathBarrier.release();
251                 }
252         }
253         
254         static interface MainProgramRunnable {
255                 
256                 public void run() throws Exception;
257                 public void done();
258                 
259         }
260
261         public Exception runIdle(MainProgramRunnable runnable) {
262                 try {
263                         mutex.acquire();
264                         runnable.run();
265                         return null;
266                 } catch (Exception e) {
267                         return e;
268                 } finally {
269                         runnable.done();
270                         mutex.release();
271                 }
272         }
273         
274         /*
275          * Mutex for streamLRU is assumed here
276          * 
277          */
278         private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
279
280                 // Cache chunks during update operations
281                 boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
282                 while(written) {
283                         written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
284                 }
285         }
286
287         private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
288
289                 // Cache chunks during update operations
290                 boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
291                 while(written) {
292                         written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
293                 }
294         }
295
296         public synchronized void committed() {
297
298                 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
299         if (!alive) {
300             System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
301 //          return;
302         }
303                 if(last != null) last.commit();
304
305         }
306
307         public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
308             if (!alive) {
309                 System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
310 //              return;
311             }
312                 clusters.streamLRU.acquireMutex();
313
314                 try {
315
316                         ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
317                         if(last == null || last.isCommitted()) {
318                                 String id = "" + currentChangeSetId + "-" + nextChunkId++;
319                                 last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
320                                 operations.add(last);
321                         }
322
323                         String chunkId = last.getKey();
324                         int chunkOffset = last.operations.size();
325                         operation.scheduled(chunkId + "." + chunkOffset);
326
327                         last.addOperation(operation);
328
329                         swapChunks();
330
331                         notifyAll();
332                 } catch (IllegalAcornStateException e) {
333                     throw e;
334                 } catch (Throwable t) {
335                         throw new IllegalAcornStateException(t);
336                 } finally {
337                         clusters.streamLRU.releaseMutex();
338                 }
339         }
340
341     @Override
342     public void close() {
343         alive = false;
344         synchronized (this) {
345             notifyAll();
346         }
347         try {
348             deathBarrier.acquire();
349         } catch (InterruptedException e) {
350         }
351
352         for (ExecutorService executor : clusterUpdateThreads)
353             executor.shutdown();
354
355         for (int i = 0; i < clusterUpdateThreads.length; i++) {
356             try {
357                 ExecutorService executor  = clusterUpdateThreads[i];
358                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
359                 clusterUpdateThreads[i] = null;
360             } catch (InterruptedException e) {
361                 e.printStackTrace();
362             }
363         }
364     }
365
366 }