1 package org.simantics.acorn;
3 import java.io.Closeable;
4 import java.util.ArrayList;
5 import java.util.Comparator;
6 import java.util.LinkedList;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
24 public class MainProgram implements Runnable, Closeable {
26 private static final int CLUSTER_THREADS = 4;
27 private static final int CHUNK_CACHE_SIZE = 100;
29 private final GraphClientImpl2 client;
30 private final ClusterManager clusters;
31 private final ExecutorService[] clusterUpdateThreads;
32 private final List<ClusterUpdateOperation>[] updateSchedules;
34 private int residentOperationBytes = 0;
35 private long currentChangeSetId = -1;
36 private int nextChunkId = 0;
37 private boolean alive = true;
38 private Semaphore deathBarrier = new Semaphore(0);
40 final Semaphore mutex = new Semaphore(1);
41 final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
43 static class ClusterThreadFactory implements ThreadFactory {
48 public ClusterThreadFactory(String name, boolean daemon) {
54 public Thread newThread(Runnable r) {
55 Thread thread = new Thread(r, name);
56 thread.setDaemon(daemon);
61 public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
64 this.clusters = clusters;
65 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
66 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
67 for(int i=0;i<clusterUpdateThreads.length;i++) {
68 clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
69 updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
73 public void startTransaction(long id) {
74 currentChangeSetId = id;
78 private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
81 public int compare(ClusterUID o1, ClusterUID o2) {
82 return Long.compare(o1.second, o2.second);
94 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
96 synchronized(MainProgram.this) {
98 while(!operations.isEmpty() && updates.size() < 100) {
100 ClusterStreamChunk chunk = operations.pollFirst();
102 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
103 ClusterUpdateOperation o = chunk.operations.get(i);
104 ClusterUID uid = o.uid;
105 List<ClusterUpdateOperation> ops = updates.get(uid);
107 ops = new ArrayList<ClusterUpdateOperation>();
108 updates.put(uid, ops);
113 chunk.nextToProcess = chunk.operations.size();
115 if(!chunk.isCommitted()) {
116 assert(operations.isEmpty());
117 operations.add(chunk);
123 if(updates.isEmpty()) {
125 long start = System.nanoTime();
127 MainProgram.this.wait(5000);
131 long duration = System.nanoTime()-start;
132 if(duration > 4000000000L) {
134 // Was this a time-out or a new stream request?
135 if(operations.isEmpty()) {
138 * We are idling here.
139 * Flush all caches gradually
142 // Write pending cs to disk
143 boolean written = clusters.csLRU.swapForced();
145 if(!updates.isEmpty()) break;
146 written = clusters.csLRU.swapForced();
148 // Write pending chunks to disk
149 written = clusters.streamLRU.swapForced();
151 if(!updates.isEmpty()) break;
152 written = clusters.streamLRU.swapForced();
154 // Write pending files to disk
155 written = clusters.fileLRU.swapForced();
157 if(!updates.isEmpty()) break;
158 written = clusters.fileLRU.swapForced();
160 // Write pending clusters to disk
161 written = clusters.clusterLRU.swapForced();
163 if(!updates.isEmpty()) break;
164 written = clusters.clusterLRU.swapForced();
167 client.tryMakeSnapshot();
170 } catch (InterruptedException e) {
176 // long sss = System.nanoTime();
178 for(int i=0;i<CLUSTER_THREADS;i++)
179 updateSchedules[i].clear();
181 final Semaphore s = new Semaphore(0);
183 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
184 ClusterUID key = entry.getKey();
185 int hash = key.hashCode() & (clusterUpdateThreads.length-1);
186 updateSchedules[hash].addAll(entry.getValue());
189 // final AtomicLong elapsed = new AtomicLong(0);
190 int acquireAmount = 0;
191 for(int i=0;i<CLUSTER_THREADS;i++) {
192 final List<ClusterUpdateOperation> ops = updateSchedules[i];
193 if (!ops.isEmpty()) {
195 clusterUpdateThreads[i].submit(new Callable<Object>() {
198 public Object call() throws Exception {
199 //long st = System.nanoTime();
201 for(ClusterUpdateOperation op : ops) {
209 // long duration = System.nanoTime()-st;
210 // elapsed.addAndGet(duration);
211 // double dur = 1e-9*duration;
213 // System.err.println("duration=" + dur + "s. " + ops.size());
219 s.acquire(acquireAmount);
222 * Here we are actively processing updates from client.
223 * Maintain necessary caching here.
226 clusters.streamLRU.acquireMutex();
229 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
232 clusters.streamLRU.releaseMutex();
234 clusters.csLRU.acquireMutex();
237 } catch (Throwable t) {
238 throw new IllegalAcornStateException(t);
240 clusters.csLRU.releaseMutex();
243 TimeLogger.log("Performed updates");
247 } catch (Throwable t) {
250 deathBarrier.release();
254 static interface MainProgramRunnable {
256 public void run() throws Exception;
261 public Exception runIdle(MainProgramRunnable runnable) {
266 } catch (Exception e) {
275 * Mutex for streamLRU is assumed here
278 private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
280 // Cache chunks during update operations
281 boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
283 written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
287 private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
289 // Cache chunks during update operations
290 boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
292 written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
296 public synchronized void committed() {
298 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
300 System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
303 if(last != null) last.commit();
307 public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
309 System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
312 clusters.streamLRU.acquireMutex();
316 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
317 if(last == null || last.isCommitted()) {
318 String id = "" + currentChangeSetId + "-" + nextChunkId++;
319 last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
320 operations.add(last);
323 String chunkId = last.getKey();
324 int chunkOffset = last.operations.size();
325 operation.scheduled(chunkId + "." + chunkOffset);
327 last.addOperation(operation);
332 } catch (IllegalAcornStateException e) {
334 } catch (Throwable t) {
335 throw new IllegalAcornStateException(t);
337 clusters.streamLRU.releaseMutex();
342 public void close() {
344 synchronized (this) {
348 deathBarrier.acquire();
349 } catch (InterruptedException e) {
352 for (ExecutorService executor : clusterUpdateThreads)
355 for (int i = 0; i < clusterUpdateThreads.length; i++) {
357 ExecutorService executor = clusterUpdateThreads[i];
358 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
359 clusterUpdateThreads[i] = null;
360 } catch (InterruptedException e) {