1 package org.simantics.acorn;
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.util.ArrayList;
6 import java.util.Comparator;
9 import java.util.TreeMap;
10 import java.util.concurrent.Callable;
11 import java.util.concurrent.ExecutorService;
12 import java.util.concurrent.Executors;
13 import java.util.concurrent.Semaphore;
14 import java.util.concurrent.ThreadFactory;
15 import java.util.concurrent.TimeUnit;
17 import org.simantics.acorn.exception.AcornAccessVerificationException;
18 import org.simantics.acorn.exception.IllegalAcornStateException;
19 import org.simantics.acorn.lru.ClusterStreamChunk;
20 import org.simantics.acorn.lru.ClusterUpdateOperation;
21 import org.simantics.db.service.ClusterUID;
22 import org.simantics.utils.logging.TimeLogger;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
26 public class MainProgram implements Runnable, Closeable {
28 private static final Logger LOGGER = LoggerFactory.getLogger(MainProgram.class);
30 private static final int CLUSTER_THREADS = 4;
31 private static final int CHUNK_CACHE_SIZE = 100;
33 private final GraphClientImpl2 client;
34 private final ExecutorService[] clusterUpdateThreads;
35 private final List<ClusterUpdateOperation>[] updateSchedules;
37 private Thread mainProgramThread;
39 private boolean alive = true;
40 private Semaphore deathBarrier = new Semaphore(0);
42 final ClusterManager clusters;
44 private final OperationQueue operationQueue = new OperationQueue(this);
46 static class ClusterThreadFactory implements ThreadFactory {
51 public ClusterThreadFactory(String name, boolean daemon) {
57 public Thread newThread(Runnable r) {
58 Thread thread = new Thread(r, name);
59 thread.setDaemon(daemon);
64 @SuppressWarnings("unchecked")
65 MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
68 this.clusters = clusters;
69 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
70 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
71 for(int i=0;i<clusterUpdateThreads.length;i++) {
72 clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
73 updateSchedules[i] = new ArrayList<>();
77 void startTransaction(long id) {
78 operationQueue.startTransaction(id);
81 private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
84 public int compare(ClusterUID o1, ClusterUID o2) {
85 return Long.compare(o1.second, o2.second);
92 mainProgramThread = Thread.currentThread();
96 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<>(clusterComparator);
101 if (!updates.isEmpty())
103 operationQueue.pumpUpdates(updates);
105 if(updates.isEmpty()) {
107 long start = System.nanoTime();
109 operationQueue.waitFor();
114 long duration = System.nanoTime()-start;
115 if(duration > 4000000000L) {
120 // long sss = System.nanoTime();
126 * Here we are actively processing updates from client.
127 * Maintain necessary caching here.
130 clusters.streamLRU.acquireMutex();
133 } catch (AcornAccessVerificationException | IllegalAcornStateException e) {
134 LOGGER.error("cluster chunk swapping failed", e);
136 clusters.streamLRU.releaseMutex();
138 clusters.csLRU.acquireMutex();
141 } catch (Throwable t) {
142 throw new IllegalAcornStateException(t);
144 clusters.csLRU.releaseMutex();
147 TimeLogger.log("Performed updates");
151 } catch (Throwable t) {
152 LOGGER.error("FATAL: MainProgram died unexpectedly", t);
154 deathBarrier.release();
159 static interface MainProgramRunnable {
160 void run() throws Exception;
161 default void error(Exception e) {
162 LOGGER.error("An error occured", e);
164 default void success() {}
167 private void runUpdates(TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates) throws InterruptedException {
169 for(int i=0;i<CLUSTER_THREADS;i++)
170 updateSchedules[i].clear();
172 if (updates.isEmpty())
175 final Semaphore s = new Semaphore(0);
177 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
178 ClusterUID key = entry.getKey();
179 int hash = key.hashCode() & (clusterUpdateThreads.length-1);
180 updateSchedules[hash].addAll(entry.getValue());
183 // final AtomicLong elapsed = new AtomicLong(0);
184 int acquireAmount = 0;
185 for(int i=0;i<CLUSTER_THREADS;i++) {
186 final List<ClusterUpdateOperation> ops = updateSchedules[i];
187 if (!ops.isEmpty()) {
189 clusterUpdateThreads[i].submit(new Callable<Object>() {
192 public Object call() throws Exception {
193 //long st = System.nanoTime();
195 for(ClusterUpdateOperation op : ops) {
203 // long duration = System.nanoTime()-st;
204 // elapsed.addAndGet(duration);
205 // double dur = 1e-9*duration;
207 // System.err.println("duration=" + dur + "s. " + ops.size());
213 s.acquire(acquireAmount);
218 * This shall be run when no updates are currently available.
220 private void runTasksIfEmpty() {
221 if(operationQueue.isEmpty()) {
222 List<MainProgramRunnable> todo = new ArrayList<>();
223 operationQueue.pumpTasks(todo);
224 for(MainProgramRunnable runnable : todo) {
228 } catch (Exception e) {
236 * This gets called when an idle period has been detected
238 private void checkIdle() throws IOException, IllegalAcornStateException, AcornAccessVerificationException {
240 // Was this a time-out or a new stream request?
241 if(operationQueue.isEmpty()) {
244 * We are idling here.
245 * Flush all caches gradually
248 // Write pending cs to disk
249 boolean written = clusters.csLRU.swapForced();
251 if(!operationQueue.isEmpty()) break;
252 written = clusters.csLRU.swapForced();
254 // Write pending chunks to disk
255 written = clusters.streamLRU.swapForced();
257 if(!operationQueue.isEmpty()) break;
258 written = clusters.streamLRU.swapForced();
260 // Write pending files to disk
261 written = clusters.fileLRU.swapForced();
263 if(!operationQueue.isEmpty()) break;
264 written = clusters.fileLRU.swapForced();
266 // Write pending clusters to disk
267 written = clusters.clusterLRU.swapForced();
269 if(!operationQueue.isEmpty()) break;
270 written = clusters.clusterLRU.swapForced();
273 client.tryMakeSnapshot();
281 * This schedules tasks to be run in MainProgram thread
282 * Called from other threads than MainProgram thread
285 void runIdle(MainProgramRunnable task) {
286 operationQueue.scheduleTask(task);
290 * Mutex for streamLRU is assumed here
293 private void swapChunks() throws AcornAccessVerificationException, IllegalAcornStateException {
295 // Cache chunks during update operations
296 while(clusters.streamLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
299 private void swapCS() throws AcornAccessVerificationException, IllegalAcornStateException {
301 // Cache chunks during update operations
302 while(clusters.csLRU.swap(Long.MAX_VALUE, CHUNK_CACHE_SIZE));
306 * Called by DB client write threads
310 ClusterStreamChunk last = operationQueue.commitLast();
312 LOGGER.error("Trying to commit operation after MainProgram is closed! Operation is " + last);
318 * Called by DB client write threads
320 void schedule(ClusterUpdateOperation operation) throws IllegalAcornStateException {
323 LOGGER.error("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
326 clusters.streamLRU.acquireMutex();
330 operationQueue.scheduleUpdate(operation);
333 } catch (IllegalAcornStateException e) {
335 } catch (Throwable t) {
336 throw new IllegalAcornStateException(t);
338 clusters.streamLRU.releaseMutex();
344 public void close() {
348 // This will wake up the sleeping beauty
349 operationQueue.scheduleTask(() -> {});
352 deathBarrier.acquire();
353 } catch (InterruptedException e) {
356 for (ExecutorService executor : clusterUpdateThreads)
359 for (int i = 0; i < clusterUpdateThreads.length; i++) {
361 ExecutorService executor = clusterUpdateThreads[i];
362 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
363 clusterUpdateThreads[i] = null;
364 } catch (InterruptedException e) {
365 LOGGER.error("clusterUpdateThread[{}] termination interrupted", i, e);
370 void assertMainProgramThread() {
371 assert(Thread.currentThread().equals(mainProgramThread));
374 void assertNoMainProgramThread() {
375 assert(!Thread.currentThread().equals(mainProgramThread));