]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
Merge "Replace embedded pdfbox jars with external plug-ins."
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
1 package org.simantics.acorn;
2
3 import java.io.Closeable;
4 import java.util.ArrayList;
5 import java.util.Comparator;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Semaphore;
13 import java.util.concurrent.ThreadFactory;
14 import java.util.concurrent.TimeUnit;
15
16 import org.simantics.acorn.lru.ClusterStreamChunk;
17 import org.simantics.acorn.lru.ClusterUpdateOperation;
18 import org.simantics.db.service.ClusterUID;
19 import org.simantics.utils.logging.TimeLogger;
20
21 public class MainProgram implements Runnable, Closeable {
22
23         private static final int CLUSTER_THREADS = 4;
24         private static final int CHUNK_CACHE_SIZE = 100;
25
26         private final GraphClientImpl2 client;
27         private final ClusterManager clusters;
28         private final ExecutorService[] clusterUpdateThreads;
29     private final List<ClusterUpdateOperation>[] updateSchedules;
30         
31         private int residentOperationBytes = 0;
32         private long currentChangeSetId = -1;
33         private int nextChunkId = 0;
34         private boolean alive = true;
35         private Semaphore deathBarrier = new Semaphore(0);
36
37         final Semaphore mutex = new Semaphore(1);
38         final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
39
40         static class ClusterThreadFactory implements ThreadFactory {
41
42                 final String name;
43                 final boolean daemon;
44
45                 public ClusterThreadFactory(String name, boolean daemon) {
46                         this.name = name;
47                         this.daemon = daemon;
48                 }
49
50                 @Override
51                 public Thread newThread(Runnable r) {
52                         Thread thread = new Thread(r, name);
53                         thread.setDaemon(daemon);
54                         return thread;
55                 }
56         }
57
58         public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
59
60                 this.client = client;
61                 this.clusters = clusters;
62                 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
63                 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
64                 for(int i=0;i<clusterUpdateThreads.length;i++) {
65                         clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
66                         updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
67                 }
68         }
69
70         public void startTransaction(long id) {
71                 currentChangeSetId = id;
72                 nextChunkId = 0;
73         }
74
75         private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
76
77                 @Override
78                 public int compare(ClusterUID o1, ClusterUID o2) {
79                         return Long.compare(o1.second, o2.second);
80                 }
81         };
82
83         @Override
84         public void run() {
85                 try {
86
87                         mutex.acquire();
88                         main:
89                         while(alive) {
90
91                                 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
92
93                                 synchronized(MainProgram.this) {
94
95                                         while(!operations.isEmpty() && updates.size() < 100) {
96
97                                                 ClusterStreamChunk chunk = operations.pollFirst();
98
99                                                 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
100                                                         ClusterUpdateOperation o = chunk.operations.get(i);
101                                                         ClusterUID uid = o.uid;
102                                                         List<ClusterUpdateOperation> ops = updates.get(uid);
103                                                         if(ops == null) {
104                                                                 ops = new ArrayList<ClusterUpdateOperation>();
105                                                                 updates.put(uid, ops);
106                                                         }
107                                                         ops.add(o);
108                                                 }
109
110                                                 chunk.nextToProcess = chunk.operations.size();
111
112                                                 if(!chunk.isCommitted()) {
113                                                         assert(operations.isEmpty());
114                                                         operations.add(chunk);
115                                                         break;
116                                                 }
117
118                                         }
119
120                                         if(updates.isEmpty()) {
121                                                 try {
122                                                         long start = System.nanoTime();
123                                                         mutex.release();
124                                                         MainProgram.this.wait(5000);
125                                                         mutex.acquire();
126                                                         if (!alive)
127                                                                 break main;
128                                                         long duration = System.nanoTime()-start;
129                                                         if(duration > 4000000000L) {
130
131                                                                 // Was this a time-out or a new stream request?
132                                                                 if(operations.isEmpty()) {
133
134                                                                         /*
135                                                                          * We are idling here.
136                                                                          * Flush all caches gradually
137                                                                          */
138
139                                                                         // Write pending cs to disk
140                                                                         boolean written = clusters.csLRU.swapForced();
141                                                                         while(written) {
142                                                                                 if(!updates.isEmpty()) break;
143                                                                                 written = clusters.csLRU.swapForced();
144                                                                         }
145                                                                         // Write pending chunks to disk
146                                                                         written = clusters.streamLRU.swapForced();
147                                                                         while(written) {
148                                                                                 if(!updates.isEmpty()) break;
149                                                                                 written = clusters.streamLRU.swapForced();
150                                                                         }
151                                                                         // Write pending files to disk
152                                                                         written = clusters.fileLRU.swapForced();
153                                                                         while(written) {
154                                                                                 if(!updates.isEmpty()) break;
155                                                                                 written = clusters.fileLRU.swapForced();
156                                                                         }
157                                                                         // Write pending clusters to disk
158                                                                         written = clusters.clusterLRU.swapForced();
159                                                                         while(written) {
160                                                                                 if(!updates.isEmpty()) break;
161                                                                                 written = clusters.clusterLRU.swapForced();
162                                                                         }
163
164                                                                         client.tryMakeSnapshot();
165                                                                 }
166                                                         }
167                                                 } catch (InterruptedException e) {
168                                                         e.printStackTrace();
169                                                 }
170
171                                         }
172
173                                 }
174
175 //                              long sss = System.nanoTime();
176
177                                 for(int i=0;i<CLUSTER_THREADS;i++)
178                                         updateSchedules[i].clear();
179
180                                 final Semaphore s = new Semaphore(0);
181
182                                 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
183                                         ClusterUID key = entry.getKey();
184                                         int hash = key.hashCode() & (clusterUpdateThreads.length-1);
185                                         updateSchedules[hash].addAll(entry.getValue());
186                                 }
187
188                                 //                              final AtomicLong elapsed = new AtomicLong(0);
189                                 int acquireAmount = 0;
190                                 for(int i=0;i<CLUSTER_THREADS;i++) {
191                                         final List<ClusterUpdateOperation> ops = updateSchedules[i];
192                                         if (!ops.isEmpty()) {
193                                                 acquireAmount++;
194                                                 clusterUpdateThreads[i].execute(() -> {
195
196                                                         //long st = System.nanoTime();
197                                                         for(ClusterUpdateOperation op : ops) {
198                                                                 op.run();
199                                                         }
200                                                         s.release();
201                                                         //                                                      long duration = System.nanoTime()-st;
202                                                         //                                                      elapsed.addAndGet(duration);
203                                                         //                                                      double dur = 1e-9*duration;
204                                                         //                                                      if(dur > 0.05)
205                                                         //                                                              System.err.println("duration=" + dur + "s. " + ops.size());
206                                                 });
207                                         }
208                                 }
209
210                                 s.acquire(acquireAmount);
211
212                                 /*
213                                  * Here we are actively processing updates from client.
214                                  * Maintain necessary caching here.
215                                  */
216
217                                 clusters.streamLRU.acquireMutex();
218                                 try {
219                                         swapChunks();
220                                 } catch (Throwable t) {
221                                         throw new IllegalStateException(t);
222                                 } finally {
223                                         clusters.streamLRU.releaseMutex();
224                                 }
225                                 clusters.csLRU.acquireMutex();
226                                 try {
227                                         swapCS();
228                                 } catch (Throwable t) {
229                                         throw new IllegalStateException(t);
230                                 } finally {
231                                         clusters.csLRU.releaseMutex();
232                                 }
233
234                                 TimeLogger.log("Performed updates");
235
236                         }
237
238                 } catch (Throwable t) {
239                         t.printStackTrace();
240                 } finally {
241                         deathBarrier.release();
242                 }
243
244         }
245
246         /*
247          * Mutex for streamLRU is assumed here
248          * 
249          */
250         private void swapChunks() {
251
252                 // Cache chunks during update operations
253                 boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
254                 while(written) {
255                         written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
256                 }
257
258         }
259
260         private void swapCS() {
261
262                 // Cache chunks during update operations
263                 boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
264                 while(written) {
265                         written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
266                 }
267
268         }
269
270         public synchronized void committed() {
271
272                 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
273         if (!alive) {
274             System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
275 //          return;
276         }
277                 if(last != null) last.commit();
278
279         }
280
281         public synchronized void schedule(ClusterUpdateOperation operation) {
282             if (!alive) {
283                 System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
284 //              return;
285             }
286                 clusters.streamLRU.acquireMutex();
287
288                 try {
289
290                         ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
291                         if(last == null || last.isCommitted()) {
292                                 String id = "" + currentChangeSetId + "-" + nextChunkId++;
293                                 last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
294                                 operations.add(last);
295                         }
296
297                         String chunkId = last.getKey();
298                         int chunkOffset = last.operations.size();
299                         operation.scheduled(chunkId + "." + chunkOffset);
300
301                         last.addOperation(operation);
302
303                         swapChunks();
304
305                         notifyAll();
306
307                 } catch (Throwable t) {
308                         throw new IllegalStateException(t);
309                 } finally {
310
311                         clusters.streamLRU.releaseMutex();
312
313                 }
314
315         }
316
317     @Override
318     public void close() {
319         alive = false;
320         synchronized (this) {
321             notifyAll();
322         }
323         try {
324             deathBarrier.acquire();
325         } catch (InterruptedException e) {
326         }
327
328         for (ExecutorService executor : clusterUpdateThreads)
329             executor.shutdown();
330
331         for (int i = 0; i < clusterUpdateThreads.length; i++) {
332             try {
333                 ExecutorService executor  = clusterUpdateThreads[i];
334                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
335                 clusterUpdateThreads[i] = null;
336             } catch (InterruptedException e) {
337                 e.printStackTrace();
338             }
339         }
340     }
341
342 }