1 package org.simantics.acorn;
3 import java.io.Closeable;
4 import java.util.ArrayList;
5 import java.util.Comparator;
6 import java.util.LinkedList;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public class MainProgram implements Runnable, Closeable {
28 private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
30 private static final int CLUSTER_THREADS = 4;
31 private static final int CHUNK_CACHE_SIZE = 100;
33 private final GraphClientImpl2 client;
34 private final ClusterManager clusters;
35 private final ExecutorService[] clusterUpdateThreads;
36 private final List<ClusterUpdateOperation>[] updateSchedules;
38 private int residentOperationBytes = 0;
39 private long currentChangeSetId = -1;
40 private int nextChunkId = 0;
41 private boolean alive = true;
42 private Semaphore deathBarrier = new Semaphore(0);
44 final Semaphore mutex = new Semaphore(1);
45 final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
47 static class ClusterThreadFactory implements ThreadFactory {
52 public ClusterThreadFactory(String name, boolean daemon) {
58 public Thread newThread(Runnable r) {
59 Thread thread = new Thread(r, name);
60 thread.setDaemon(daemon);
65 public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
68 this.clusters = clusters;
69 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
70 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
71 for(int i=0;i<clusterUpdateThreads.length;i++) {
72 clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
73 updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
77 public void startTransaction(long id) {
78 currentChangeSetId = id;
82 private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
85 public int compare(ClusterUID o1, ClusterUID o2) {
86 return Long.compare(o1.second, o2.second);
98 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
100 synchronized(MainProgram.this) {
102 while(!operations.isEmpty() && updates.size() < 100) {
104 ClusterStreamChunk chunk = operations.pollFirst();
106 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
107 ClusterUpdateOperation o = chunk.operations.get(i);
108 ClusterUID uid = o.uid;
109 List<ClusterUpdateOperation> ops = updates.get(uid);
111 ops = new ArrayList<ClusterUpdateOperation>();
112 updates.put(uid, ops);
117 chunk.nextToProcess = chunk.operations.size();
119 if(!chunk.isCommitted()) {
120 assert(operations.isEmpty());
121 operations.add(chunk);
127 if(updates.isEmpty()) {
129 long start = System.nanoTime();
131 MainProgram.this.wait(5000); // wake up when new operations are scheduled or the last operation is committed
135 long duration = System.nanoTime()-start;
136 if(duration > 4000000000L) {
138 // Was this a time-out or a new stream request?
139 if(operations.isEmpty()) {
142 * We are idling here.
143 * Flush all caches gradually
146 // Write pending cs to disk
147 boolean written = clusters.csLRU.swapForced();
149 if(!updates.isEmpty()) break;
150 written = clusters.csLRU.swapForced();
152 // Write pending chunks to disk
153 written = clusters.streamLRU.swapForced();
155 if(!updates.isEmpty()) break;
156 written = clusters.streamLRU.swapForced();
158 // Write pending files to disk
159 written = clusters.fileLRU.swapForced();
161 if(!updates.isEmpty()) break;
162 written = clusters.fileLRU.swapForced();
164 // Write pending clusters to disk
165 written = clusters.clusterLRU.swapForced();
167 if(!updates.isEmpty()) break;
168 written = clusters.clusterLRU.swapForced();
171 client.tryMakeSnapshot();
174 } catch (InterruptedException e) {
180 // long sss = System.nanoTime();
182 for(int i=0;i<CLUSTER_THREADS;i++)
183 updateSchedules[i].clear();
185 final Semaphore s = new Semaphore(0);
187 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
188 ClusterUID key = entry.getKey();
189 int hash = key.hashCode() & (clusterUpdateThreads.length-1);
190 updateSchedules[hash].addAll(entry.getValue());
193 // final AtomicLong elapsed = new AtomicLong(0);
194 int acquireAmount = 0;
195 for(int i=0;i<CLUSTER_THREADS;i++) {
196 final List<ClusterUpdateOperation> ops = updateSchedules[i];
197 if (!ops.isEmpty()) {
199 clusterUpdateThreads[i].submit(new Callable<Object>() {
202 public Object call() throws Exception {
203 //long st = System.nanoTime();
205 for(ClusterUpdateOperation op : ops) {
213 // long duration = System.nanoTime()-st;
214 // elapsed.addAndGet(duration);
215 // double dur = 1e-9*duration;
217 // System.err.println("duration=" + dur + "s. " + ops.size());
223 s.acquire(acquireAmount);
226 * Here we are actively processing updates from client.
227 * Maintain necessary caching here.
230 clusters.streamLRU.acquireMutex();
233 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
236 clusters.streamLRU.releaseMutex();
238 clusters.csLRU.acquireMutex();
241 } catch (Throwable t) {
242 throw new IllegalAcornStateException(t);
244 clusters.csLRU.releaseMutex();
247 TimeLogger.log("Performed updates");
251 } catch (Throwable t) {
254 deathBarrier.release();
258 static interface MainProgramRunnable {
260 public void run() throws Exception;
265 public Exception runIdle(MainProgramRunnable runnable) {
267 long startTime = System.currentTimeMillis();
269 boolean hasMutex = false;
271 synchronized (MainProgram.this) {
272 if (hasMutex = mutex.tryAcquire()) {
273 if (operations.isEmpty()) {
279 long endTime = System.currentTimeMillis();
280 if ((endTime - startTime) > 100) {
282 LOGGER.info("MainProgram.runIdle() retry mutex acquire!");
284 } catch (Exception e) {
297 * Mutex for streamLRU is assumed here
300 private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
302 // Cache chunks during update operations
303 while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
306 private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
308 // Cache chunks during update operations
309 while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
312 public synchronized void committed() {
314 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
316 LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
326 public synchronized void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
328 LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
331 clusters.streamLRU.acquireMutex();
335 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
336 if(last == null || last.isCommitted()) {
337 String id = "" + currentChangeSetId + "-" + nextChunkId++;
338 last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
339 operations.add(last);
342 String chunkId = last.getKey();
343 int chunkOffset = last.operations.size();
344 operation.scheduled(chunkId + "." + chunkOffset);
346 last.addOperation(operation);
351 } catch (IllegalAcornStateException e) {
353 } catch (Throwable t) {
354 throw new IllegalAcornStateException(t);
356 clusters.streamLRU.releaseMutex();
361 public void close() {
363 synchronized (this) {
367 deathBarrier.acquire();
368 } catch (InterruptedException e) {
371 for (ExecutorService executor : clusterUpdateThreads)
374 for (int i = 0; i < clusterUpdateThreads.length; i++) {
376 ExecutorService executor = clusterUpdateThreads[i];
377 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
378 clusterUpdateThreads[i] = null;
379 } catch (InterruptedException e) {