]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
ecc9649f3b550054d0cd60220ad74a016c2d4337
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
1 package org.simantics.acorn;
2
3 import java.io.Closeable;
4 import java.util.ArrayList;
5 import java.util.Comparator;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
16
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 public class MainProgram implements Runnable, Closeable {
27
28         private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
29         
30         private static final int CLUSTER_THREADS = 4;
31         private static final int CHUNK_CACHE_SIZE = 100;
32
33         private final GraphClientImpl2 client;
34         private final ClusterManager clusters;
35         private final ExecutorService[] clusterUpdateThreads;
36     private final List<ClusterUpdateOperation>[] updateSchedules;
37         
38         private int residentOperationBytes = 0;
39         private long currentChangeSetId = -1;
40         private int nextChunkId = 0;
41         private boolean alive = true;
42         private Semaphore deathBarrier = new Semaphore(0);
43
44         final Semaphore mutex = new Semaphore(1);
45         final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
46
47         static class ClusterThreadFactory implements ThreadFactory {
48
49                 final String name;
50                 final boolean daemon;
51
52                 public ClusterThreadFactory(String name, boolean daemon) {
53                         this.name = name;
54                         this.daemon = daemon;
55                 }
56
57                 @Override
58                 public Thread newThread(Runnable r) {
59                         Thread thread = new Thread(r, name);
60                         thread.setDaemon(daemon);
61                         return thread;
62                 }
63         }
64
65         public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
66
67                 this.client = client;
68                 this.clusters = clusters;
69                 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
70                 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
71                 for(int i=0;i<clusterUpdateThreads.length;i++) {
72                         clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
73                         updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
74                 }
75         }
76
77         public void startTransaction(long id) {
78                 currentChangeSetId = id;
79                 nextChunkId = 0;
80         }
81
82         private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
83
84                 @Override
85                 public int compare(ClusterUID o1, ClusterUID o2) {
86                         return Long.compare(o1.second, o2.second);
87                 }
88         };
89
90         @Override
91         public void run() {
92                 try {
93
94                         mutex.acquire();
95                         main:
96                         while(alive) {
97
98                                 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
99
100                                 synchronized(MainProgram.this) {
101
102                                         while(!operations.isEmpty() && updates.size() < 100) {
103
104                                                 ClusterStreamChunk chunk = operations.pollFirst();
105
106                                                 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
107                                                         ClusterUpdateOperation o = chunk.operations.get(i);
108                                                         ClusterUID uid = o.uid;
109                                                         List<ClusterUpdateOperation> ops = updates.get(uid);
110                                                         if(ops == null) {
111                                                                 ops = new ArrayList<ClusterUpdateOperation>();
112                                                                 updates.put(uid, ops);
113                                                         }
114                                                         ops.add(o);
115                                                 }
116
117                                                 chunk.nextToProcess = chunk.operations.size();
118
119                                                 if(!chunk.isCommitted()) {
120                                                         assert(operations.isEmpty());
121                                                         operations.add(chunk);
122                                                         break;
123                                                 }
124
125                                         }
126
127                                         if(updates.isEmpty()) {
128                                                 try {
129                                                         long start = System.nanoTime();
130                                                         mutex.release();
131                                                         MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed  
132                                                         mutex.acquire();
133                                                         if (!alive)
134                                                                 break main;
135                                                         long duration = System.nanoTime()-start;
136                                                         if(duration > 4000000000L) {
137
138                                                                 // Was this a time-out or a new stream request?
139                                                                 if(operations.isEmpty()) {
140
141                                                                         /*
142                                                                          * We are idling here.
143                                                                          * Flush all caches gradually
144                                                                          */
145
146                                                                         // Write pending cs to disk
147                                                                         boolean written = clusters.csLRU.swapForced();
148                                                                         while(written) {
149                                                                                 if(!updates.isEmpty()) break;
150                                                                                 written = clusters.csLRU.swapForced();
151                                                                         }
152                                                                         // Write pending chunks to disk
153                                                                         written = clusters.streamLRU.swapForced();
154                                                                         while(written) {
155                                                                                 if(!updates.isEmpty()) break;
156                                                                                 written = clusters.streamLRU.swapForced();
157                                                                         }
158                                                                         // Write pending files to disk
159                                                                         written = clusters.fileLRU.swapForced();
160                                                                         while(written) {
161                                                                                 if(!updates.isEmpty()) break;
162                                                                                 written = clusters.fileLRU.swapForced();
163                                                                         }
164                                                                         // Write pending clusters to disk
165                                                                         written = clusters.clusterLRU.swapForced();
166                                                                         while(written) {
167                                                                                 if(!updates.isEmpty()) break;
168                                                                                 written = clusters.clusterLRU.swapForced();
169                                                                         }
170
171                                                                         client.tryMakeSnapshot();
172                                                                 }
173                                                         }
174                                                 } catch (InterruptedException e) {
175                                                         e.printStackTrace();
176                                                 }
177                                         }
178                                 }
179
180 //                              long sss = System.nanoTime();
181
182                                 for(int i=0;i<CLUSTER_THREADS;i++)
183                                         updateSchedules[i].clear();
184
185                                 final Semaphore s = new Semaphore(0);
186
187                                 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
188                                         ClusterUID key = entry.getKey();
189                                         int hash = key.hashCode() & (clusterUpdateThreads.length-1);
190                                         updateSchedules[hash].addAll(entry.getValue());
191                                 }
192
193                                 //                              final AtomicLong elapsed = new AtomicLong(0);
194                                 int acquireAmount = 0;
195                                 for(int i=0;i<CLUSTER_THREADS;i++) {
196                                         final List<ClusterUpdateOperation> ops = updateSchedules[i];
197                                         if (!ops.isEmpty()) {
198                                                 acquireAmount++;
199                                                 clusterUpdateThreads[i].submit(new Callable<Object>() {
200
201                             @Override
202                             public Object call() throws Exception {
203                                 //long st = System.nanoTime();
204                                 try {
205                                     for(ClusterUpdateOperation op : ops) {
206                                         op.run();
207                                     }
208                                 } finally {
209                                     s.release();
210                                 }
211                                 return null;
212                                 
213     //                          long duration = System.nanoTime()-st;
214     //                          elapsed.addAndGet(duration);
215     //                          double dur = 1e-9*duration;
216     //                          if(dur > 0.05)
217     //                              System.err.println("duration=" + dur + "s. " + ops.size());
218                             }
219                         });
220                                         }
221                                 }
222
223                                 s.acquire(acquireAmount);
224
225                                 /*
226                                  * Here we are actively processing updates from client.
227                                  * Maintain necessary caching here.
228                                  */
229
230                                 clusters.streamLRU.acquireMutex();
231                                 try {
232                                         swapChunks();
233                                 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
234                                     e.printStackTrace();
235                                 } finally {
236                                         clusters.streamLRU.releaseMutex();
237                                 }
238                                 clusters.csLRU.acquireMutex();
239                                 try {
240                                         swapCS();
241                                 } catch (Throwable t) {
242                                         throw new IllegalAcornStateException(t);
243                                 } finally {
244                                         clusters.csLRU.releaseMutex();
245                                 }
246
247                                 TimeLogger.log("Performed updates");
248
249                         }
250
251                 } catch (Throwable t) {
252                         t.printStackTrace();
253                 } finally {
254                         deathBarrier.release();
255                 }
256         }
257         
258         static interface MainProgramRunnable {
259                 
260                 public void run() throws Exception;
261                 public void done();
262                 
263         }
264
265         public Exception runIdle(MainProgramRunnable runnable) {
266                 try {
267                         long startTime = System.currentTimeMillis();
268                         while (true) {
269                                 boolean hasMutex = false;
270                                 try {
271                                         synchronized (MainProgram.this) {
272                                                 if (hasMutex = mutex.tryAcquire()) {
273                                                         if (operations.isEmpty()) {
274                                                                 runnable.run();
275                                                                 return null;
276                                                         }
277                                                 }
278                                         }
279                                         long endTime = System.currentTimeMillis(); 
280                                         if ((endTime - startTime) > 100) {
281                                                 startTime = endTime; 
282                                                 LOGGER.info("MainProgram.runIdle() retry mutex acquire!");
283                                         }
284                                 } catch (Exception e) {
285                                         return e;
286                                 } finally {
287                                         if (hasMutex)
288                                                 mutex.release();
289                                 }
290                         }
291                 } finally {
292                         runnable.done();
293                 }
294         }
295         
296         /*
297          * Mutex for streamLRU is assumed here
298          * 
299          */
300         private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
301
302                 // Cache chunks during update operations
303                 boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
304                 while(written) {
305                         written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
306                 }
307         }
308
309         private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
310
311                 // Cache chunks during update operations
312                 boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
313                 while(written) {
314                         written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
315                 }
316         }
317
318         public synchronized void committed() {
319
320                 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
321         if (!alive) {
322             LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
323 //          return;
324         }
325                 if(last != null) {
326                         last.commit();
327                         notifyAll();
328                 }
329
330         }
331
332         public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
333             if (!alive) {
334                 LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
335 //              return;
336             }
337                 clusters.streamLRU.acquireMutex();
338
339                 try {
340
341                         ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
342                         if(last == null || last.isCommitted()) {
343                                 String id = "" + currentChangeSetId + "-" + nextChunkId++;
344                                 last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
345                                 operations.add(last);
346                         }
347
348                         String chunkId = last.getKey();
349                         int chunkOffset = last.operations.size();
350                         operation.scheduled(chunkId + "." + chunkOffset);
351
352                         last.addOperation(operation);
353
354                         swapChunks();
355
356                         notifyAll();
357                 } catch (IllegalAcornStateException e) {
358                     throw e;
359                 } catch (Throwable t) {
360                         throw new IllegalAcornStateException(t);
361                 } finally {
362                         clusters.streamLRU.releaseMutex();
363                 }
364         }
365
366     @Override
367     public void close() {
368         alive = false;
369         synchronized (this) {
370             notifyAll();
371         }
372         try {
373             deathBarrier.acquire();
374         } catch (InterruptedException e) {
375         }
376
377         for (ExecutorService executor : clusterUpdateThreads)
378             executor.shutdown();
379
380         for (int i = 0; i < clusterUpdateThreads.length; i++) {
381             try {
382                 ExecutorService executor  = clusterUpdateThreads[i];
383                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
384                 clusterUpdateThreads[i] = null;
385             } catch (InterruptedException e) {
386                 e.printStackTrace();
387             }
388         }
389     }
390
391 }