]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.acorn/src/org/simantics/acorn/MainProgram.java
Merge commit '5da68e7'
[simantics/platform.git] / bundles / org.simantics.acorn / src / org / simantics / acorn / MainProgram.java
1 package org.simantics.acorn;
2
3 import java.io.Closeable;
4 import java.util.ArrayList;
5 import java.util.Comparator;
6 import java.util.LinkedList;
7 import java.util.List;
8 import java.util.Map;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
16
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23
24 public class MainProgram implements Runnable, Closeable {
25
26         private static final int CLUSTER_THREADS = 4;
27         private static final int CHUNK_CACHE_SIZE = 100;
28
29         private final GraphClientImpl2 client;
30         private final ClusterManager clusters;
31         private final ExecutorService[] clusterUpdateThreads;
32     private final List<ClusterUpdateOperation>[] updateSchedules;
33         
34         private int residentOperationBytes = 0;
35         private long currentChangeSetId = -1;
36         private int nextChunkId = 0;
37         private boolean alive = true;
38         private Semaphore deathBarrier = new Semaphore(0);
39
40         final Semaphore mutex = new Semaphore(1);
41         final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
42
43         static class ClusterThreadFactory implements ThreadFactory {
44
45                 final String name;
46                 final boolean daemon;
47
48                 public ClusterThreadFactory(String name, boolean daemon) {
49                         this.name = name;
50                         this.daemon = daemon;
51                 }
52
53                 @Override
54                 public Thread newThread(Runnable r) {
55                         Thread thread = new Thread(r, name);
56                         thread.setDaemon(daemon);
57                         return thread;
58                 }
59         }
60
61         public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
62
63                 this.client = client;
64                 this.clusters = clusters;
65                 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
66                 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
67                 for(int i=0;i<clusterUpdateThreads.length;i++) {
68                         clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
69                         updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
70                 }
71         }
72
73         public void startTransaction(long id) {
74                 currentChangeSetId = id;
75                 nextChunkId = 0;
76         }
77
78         private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
79
80                 @Override
81                 public int compare(ClusterUID o1, ClusterUID o2) {
82                         return Long.compare(o1.second, o2.second);
83                 }
84         };
85
86         @Override
87         public void run() {
88                 try {
89
90                         mutex.acquire();
91                         main:
92                         while(alive) {
93
94                                 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
95
96                                 synchronized(MainProgram.this) {
97
98                                         while(!operations.isEmpty() && updates.size() < 100) {
99
100                                                 ClusterStreamChunk chunk = operations.pollFirst();
101
102                                                 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
103                                                         ClusterUpdateOperation o = chunk.operations.get(i);
104                                                         ClusterUID uid = o.uid;
105                                                         List<ClusterUpdateOperation> ops = updates.get(uid);
106                                                         if(ops == null) {
107                                                                 ops = new ArrayList<ClusterUpdateOperation>();
108                                                                 updates.put(uid, ops);
109                                                         }
110                                                         ops.add(o);
111                                                 }
112
113                                                 chunk.nextToProcess = chunk.operations.size();
114
115                                                 if(!chunk.isCommitted()) {
116                                                         assert(operations.isEmpty());
117                                                         operations.add(chunk);
118                                                         break;
119                                                 }
120
121                                         }
122
123                                         if(updates.isEmpty()) {
124                                                 try {
125                                                         long start = System.nanoTime();
126                                                         mutex.release();
127                                                         MainProgram.this.wait(5000);
128                                                         mutex.acquire();
129                                                         if (!alive)
130                                                                 break main;
131                                                         long duration = System.nanoTime()-start;
132                                                         if(duration > 4000000000L) {
133
134                                                                 // Was this a time-out or a new stream request?
135                                                                 if(operations.isEmpty()) {
136
137                                                                         /*
138                                                                          * We are idling here.
139                                                                          * Flush all caches gradually
140                                                                          */
141
142                                                                         // Write pending cs to disk
143                                                                         boolean written = clusters.csLRU.swapForced();
144                                                                         while(written) {
145                                                                                 if(!updates.isEmpty()) break;
146                                                                                 written = clusters.csLRU.swapForced();
147                                                                         }
148                                                                         // Write pending chunks to disk
149                                                                         written = clusters.streamLRU.swapForced();
150                                                                         while(written) {
151                                                                                 if(!updates.isEmpty()) break;
152                                                                                 written = clusters.streamLRU.swapForced();
153                                                                         }
154                                                                         // Write pending files to disk
155                                                                         written = clusters.fileLRU.swapForced();
156                                                                         while(written) {
157                                                                                 if(!updates.isEmpty()) break;
158                                                                                 written = clusters.fileLRU.swapForced();
159                                                                         }
160                                                                         // Write pending clusters to disk
161                                                                         written = clusters.clusterLRU.swapForced();
162                                                                         while(written) {
163                                                                                 if(!updates.isEmpty()) break;
164                                                                                 written = clusters.clusterLRU.swapForced();
165                                                                         }
166
167                                                                         client.tryMakeSnapshot();
168                                                                 }
169                                                         }
170                                                 } catch (InterruptedException e) {
171                                                         e.printStackTrace();
172                                                 }
173                                         }
174                                 }
175
176 //                              long sss = System.nanoTime();
177
178                                 for(int i=0;i<CLUSTER_THREADS;i++)
179                                         updateSchedules[i].clear();
180
181                                 final Semaphore s = new Semaphore(0);
182
183                                 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
184                                         ClusterUID key = entry.getKey();
185                                         int hash = key.hashCode() & (clusterUpdateThreads.length-1);
186                                         updateSchedules[hash].addAll(entry.getValue());
187                                 }
188
189                                 //                              final AtomicLong elapsed = new AtomicLong(0);
190                                 int acquireAmount = 0;
191                                 for(int i=0;i<CLUSTER_THREADS;i++) {
192                                         final List<ClusterUpdateOperation> ops = updateSchedules[i];
193                                         if (!ops.isEmpty()) {
194                                                 acquireAmount++;
195                                                 clusterUpdateThreads[i].submit(new Callable<Object>() {
196
197                             @Override
198                             public Object call() throws Exception {
199                                 //long st = System.nanoTime();
200                                 try {
201                                     for(ClusterUpdateOperation op : ops) {
202                                         op.run();
203                                     }
204                                 } finally {
205                                     s.release();
206                                 }
207                                 return null;
208                                 
209     //                          long duration = System.nanoTime()-st;
210     //                          elapsed.addAndGet(duration);
211     //                          double dur = 1e-9*duration;
212     //                          if(dur > 0.05)
213     //                              System.err.println("duration=" + dur + "s. " + ops.size());
214                             }
215                         });
216                                         }
217                                 }
218
219                                 s.acquire(acquireAmount);
220
221                                 /*
222                                  * Here we are actively processing updates from client.
223                                  * Maintain necessary caching here.
224                                  */
225
226                                 clusters.streamLRU.acquireMutex();
227                                 try {
228                                         swapChunks();
229                                 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
230                                     e.printStackTrace();
231                                 } finally {
232                                         clusters.streamLRU.releaseMutex();
233                                 }
234                                 clusters.csLRU.acquireMutex();
235                                 try {
236                                         swapCS();
237                                 } catch (Throwable t) {
238                                         throw new IllegalAcornStateException(t);
239                                 } finally {
240                                         clusters.csLRU.releaseMutex();
241                                 }
242
243                                 TimeLogger.log("Performed updates");
244
245                         }
246
247                 } catch (Throwable t) {
248                         t.printStackTrace();
249                 } finally {
250                         deathBarrier.release();
251                 }
252         }
253
254         /*
255          * Mutex for streamLRU is assumed here
256          * 
257          */
258         private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
259
260                 // Cache chunks during update operations
261                 boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
262                 while(written) {
263                         written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
264                 }
265         }
266
267         private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
268
269                 // Cache chunks during update operations
270                 boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
271                 while(written) {
272                         written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
273                 }
274         }
275
276         public synchronized void committed() {
277
278                 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
279         if (!alive) {
280             System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
281 //          return;
282         }
283                 if(last != null) last.commit();
284
285         }
286
287         public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
288             if (!alive) {
289                 System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
290 //              return;
291             }
292                 clusters.streamLRU.acquireMutex();
293
294                 try {
295
296                         ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
297                         if(last == null || last.isCommitted()) {
298                                 String id = "" + currentChangeSetId + "-" + nextChunkId++;
299                                 last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
300                                 operations.add(last);
301                         }
302
303                         String chunkId = last.getKey();
304                         int chunkOffset = last.operations.size();
305                         operation.scheduled(chunkId + "." + chunkOffset);
306
307                         last.addOperation(operation);
308
309                         swapChunks();
310
311                         notifyAll();
312                 } catch (IllegalAcornStateException e) {
313                     throw e;
314                 } catch (Throwable t) {
315                         throw new IllegalAcornStateException(t);
316                 } finally {
317                         clusters.streamLRU.releaseMutex();
318                 }
319         }
320
321     @Override
322     public void close() {
323         alive = false;
324         synchronized (this) {
325             notifyAll();
326         }
327         try {
328             deathBarrier.acquire();
329         } catch (InterruptedException e) {
330         }
331
332         for (ExecutorService executor : clusterUpdateThreads)
333             executor.shutdown();
334
335         for (int i = 0; i < clusterUpdateThreads.length; i++) {
336             try {
337                 ExecutorService executor  = clusterUpdateThreads[i];
338                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
339                 clusterUpdateThreads[i] = null;
340             } catch (InterruptedException e) {
341                 e.printStackTrace();
342             }
343         }
344     }
345
346 }