1 package org.simantics.acorn;
3 import java.io.Closeable;
4 import java.util.ArrayList;
5 import java.util.Comparator;
6 import java.util.LinkedList;
9 import java.util.TreeMap;
10 import java.util.concurrent.ExecutorService;
11 import java.util.concurrent.Executors;
12 import java.util.concurrent.Semaphore;
13 import java.util.concurrent.ThreadFactory;
14 import java.util.concurrent.TimeUnit;
16 import org.simantics.acorn.lru.ClusterStreamChunk;
17 import org.simantics.acorn.lru.ClusterUpdateOperation;
18 import org.simantics.db.service.ClusterUID;
19 import org.simantics.utils.logging.TimeLogger;
21 public class MainProgram implements Runnable, Closeable {
23 private static final int CLUSTER_THREADS = 4;
24 private static final int CHUNK_CACHE_SIZE = 100;
26 private final GraphClientImpl2 client;
27 private final ClusterManager clusters;
28 private final ExecutorService[] clusterUpdateThreads;
29 private final List<ClusterUpdateOperation>[] updateSchedules;
31 private int residentOperationBytes = 0;
32 private long currentChangeSetId = -1;
33 private int nextChunkId = 0;
34 private boolean alive = true;
35 private Semaphore deathBarrier = new Semaphore(0);
37 final Semaphore mutex = new Semaphore(1);
38 final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
40 static class ClusterThreadFactory implements ThreadFactory {
45 public ClusterThreadFactory(String name, boolean daemon) {
51 public Thread newThread(Runnable r) {
52 Thread thread = new Thread(r, name);
53 thread.setDaemon(daemon);
58 public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
61 this.clusters = clusters;
62 this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
63 this.updateSchedules = new ArrayList[CLUSTER_THREADS];
64 for(int i=0;i<clusterUpdateThreads.length;i++) {
65 clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
66 updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
70 public void startTransaction(long id) {
71 currentChangeSetId = id;
75 private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
78 public int compare(ClusterUID o1, ClusterUID o2) {
79 return Long.compare(o1.second, o2.second);
91 TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
93 synchronized(MainProgram.this) {
95 while(!operations.isEmpty() && updates.size() < 100) {
97 ClusterStreamChunk chunk = operations.pollFirst();
99 for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
100 ClusterUpdateOperation o = chunk.operations.get(i);
101 ClusterUID uid = o.uid;
102 List<ClusterUpdateOperation> ops = updates.get(uid);
104 ops = new ArrayList<ClusterUpdateOperation>();
105 updates.put(uid, ops);
110 chunk.nextToProcess = chunk.operations.size();
112 if(!chunk.isCommitted()) {
113 assert(operations.isEmpty());
114 operations.add(chunk);
120 if(updates.isEmpty()) {
122 long start = System.nanoTime();
124 MainProgram.this.wait(5000);
128 long duration = System.nanoTime()-start;
129 if(duration > 4000000000L) {
131 // Was this a time-out or a new stream request?
132 if(operations.isEmpty()) {
135 * We are idling here.
136 * Flush all caches gradually
139 // Write pending cs to disk
140 boolean written = clusters.csLRU.swapForced();
142 if(!updates.isEmpty()) break;
143 written = clusters.csLRU.swapForced();
145 // Write pending chunks to disk
146 written = clusters.streamLRU.swapForced();
148 if(!updates.isEmpty()) break;
149 written = clusters.streamLRU.swapForced();
151 // Write pending files to disk
152 written = clusters.fileLRU.swapForced();
154 if(!updates.isEmpty()) break;
155 written = clusters.fileLRU.swapForced();
157 // Write pending clusters to disk
158 written = clusters.clusterLRU.swapForced();
160 if(!updates.isEmpty()) break;
161 written = clusters.clusterLRU.swapForced();
164 client.tryMakeSnapshot();
167 } catch (InterruptedException e) {
175 // long sss = System.nanoTime();
177 for(int i=0;i<CLUSTER_THREADS;i++)
178 updateSchedules[i].clear();
180 final Semaphore s = new Semaphore(0);
182 for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
183 ClusterUID key = entry.getKey();
184 int hash = key.hashCode() & (clusterUpdateThreads.length-1);
185 updateSchedules[hash].addAll(entry.getValue());
188 // final AtomicLong elapsed = new AtomicLong(0);
189 int acquireAmount = 0;
190 for(int i=0;i<CLUSTER_THREADS;i++) {
191 final List<ClusterUpdateOperation> ops = updateSchedules[i];
192 if (!ops.isEmpty()) {
194 clusterUpdateThreads[i].execute(() -> {
196 //long st = System.nanoTime();
197 for(ClusterUpdateOperation op : ops) {
201 // long duration = System.nanoTime()-st;
202 // elapsed.addAndGet(duration);
203 // double dur = 1e-9*duration;
205 // System.err.println("duration=" + dur + "s. " + ops.size());
210 s.acquire(acquireAmount);
213 * Here we are actively processing updates from client.
214 * Maintain necessary caching here.
217 clusters.streamLRU.acquireMutex();
220 } catch (Throwable t) {
221 throw new IllegalStateException(t);
223 clusters.streamLRU.releaseMutex();
225 clusters.csLRU.acquireMutex();
228 } catch (Throwable t) {
229 throw new IllegalStateException(t);
231 clusters.csLRU.releaseMutex();
234 TimeLogger.log("Performed updates");
238 } catch (Throwable t) {
241 deathBarrier.release();
247 * Mutex for streamLRU is assumed here
250 private void swapChunks() {
252 // Cache chunks during update operations
253 boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
255 written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
260 private void swapCS() {
262 // Cache chunks during update operations
263 boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
265 written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
270 public synchronized void committed() {
272 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
274 System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
277 if(last != null) last.commit();
281 public synchronized void schedule(ClusterUpdateOperation operation) {
283 System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
286 clusters.streamLRU.acquireMutex();
290 ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
291 if(last == null || last.isCommitted()) {
292 String id = "" + currentChangeSetId + "-" + nextChunkId++;
293 last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
294 operations.add(last);
297 String chunkId = last.getKey();
298 int chunkOffset = last.operations.size();
299 operation.scheduled(chunkId + "." + chunkOffset);
301 last.addOperation(operation);
307 } catch (Throwable t) {
308 throw new IllegalStateException(t);
311 clusters.streamLRU.releaseMutex();
318 public void close() {
320 synchronized (this) {
324 deathBarrier.acquire();
325 } catch (InterruptedException e) {
328 for (ExecutorService executor : clusterUpdateThreads)
331 for (int i = 0; i < clusterUpdateThreads.length; i++) {
333 ExecutorService executor = clusterUpdateThreads[i];
334 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
335 clusterUpdateThreads[i] = null;
336 } catch (InterruptedException e) {