--- /dev/null
+package org.simantics.acorn;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.simantics.acorn.lru.ClusterStreamChunk;
+import org.simantics.acorn.lru.ClusterUpdateOperation;
+import org.simantics.db.service.ClusterUID;
+import org.simantics.utils.logging.TimeLogger;
+
+public class MainProgram implements Runnable, Closeable {
+
+ private static final int CLUSTER_THREADS = 4;
+ private static final int CHUNK_CACHE_SIZE = 100;
+
+ private final GraphClientImpl2 client;
+ private final ClusterManager clusters;
+ private final ExecutorService[] clusterUpdateThreads;
+ private final List<ClusterUpdateOperation>[] updateSchedules;
+
+ private int residentOperationBytes = 0;
+ private long currentChangeSetId = -1;
+ private int nextChunkId = 0;
+ private boolean alive = true;
+ private Semaphore deathBarrier = new Semaphore(0);
+
+ final Semaphore mutex = new Semaphore(1);
+ final LinkedList<ClusterStreamChunk> operations = new LinkedList<>();
+
+ static class ClusterThreadFactory implements ThreadFactory {
+
+ final String name;
+ final boolean daemon;
+
+ public ClusterThreadFactory(String name, boolean daemon) {
+ this.name = name;
+ this.daemon = daemon;
+ }
+
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, name);
+ thread.setDaemon(daemon);
+ return thread;
+ }
+ }
+
+ public MainProgram(GraphClientImpl2 client, ClusterManager clusters) {
+
+ this.client = client;
+ this.clusters = clusters;
+ this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS];
+ this.updateSchedules = new ArrayList[CLUSTER_THREADS];
+ for(int i=0;i<clusterUpdateThreads.length;i++) {
+ clusterUpdateThreads[i] = Executors.newSingleThreadExecutor(new ClusterThreadFactory("Cluster Updater " + (i+1), false));
+ updateSchedules[i] = new ArrayList<ClusterUpdateOperation>();
+ }
+ }
+
+ public void startTransaction(long id) {
+ currentChangeSetId = id;
+ nextChunkId = 0;
+ }
+
+ private static Comparator<ClusterUID> clusterComparator = new Comparator<ClusterUID>() {
+
+ @Override
+ public int compare(ClusterUID o1, ClusterUID o2) {
+ return Long.compare(o1.second, o2.second);
+ }
+ };
+
+ @Override
+ public void run() {
+ try {
+
+ mutex.acquire();
+ main:
+ while(alive) {
+
+ TreeMap<ClusterUID, List<ClusterUpdateOperation>> updates = new TreeMap<ClusterUID, List<ClusterUpdateOperation>>(clusterComparator);
+
+ synchronized(MainProgram.this) {
+
+ while(!operations.isEmpty() && updates.size() < 100) {
+
+ ClusterStreamChunk chunk = operations.pollFirst();
+
+ for(int i=chunk.nextToProcess;i<chunk.operations.size();i++) {
+ ClusterUpdateOperation o = chunk.operations.get(i);
+ ClusterUID uid = o.uid;
+ List<ClusterUpdateOperation> ops = updates.get(uid);
+ if(ops == null) {
+ ops = new ArrayList<ClusterUpdateOperation>();
+ updates.put(uid, ops);
+ }
+ ops.add(o);
+ }
+
+ chunk.nextToProcess = chunk.operations.size();
+
+ if(!chunk.isCommitted()) {
+ assert(operations.isEmpty());
+ operations.add(chunk);
+ break;
+ }
+
+ }
+
+ if(updates.isEmpty()) {
+ try {
+ long start = System.nanoTime();
+ mutex.release();
+ MainProgram.this.wait(5000);
+ mutex.acquire();
+ if (!alive)
+ break main;
+ long duration = System.nanoTime()-start;
+ if(duration > 4000000000L) {
+
+ // Was this a time-out or a new stream request?
+ if(operations.isEmpty()) {
+
+ /*
+ * We are idling here.
+ * Flush all caches gradually
+ */
+
+ // Write pending cs to disk
+ boolean written = clusters.csLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.csLRU.swapForced();
+ }
+ // Write pending chunks to disk
+ written = clusters.streamLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.streamLRU.swapForced();
+ }
+ // Write pending files to disk
+ written = clusters.fileLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.fileLRU.swapForced();
+ }
+ // Write pending clusters to disk
+ written = clusters.clusterLRU.swapForced();
+ while(written) {
+ if(!updates.isEmpty()) break;
+ written = clusters.clusterLRU.swapForced();
+ }
+
+ client.tryMakeSnapshot();
+ }
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ }
+
+ }
+
+// long sss = System.nanoTime();
+
+ for(int i=0;i<CLUSTER_THREADS;i++)
+ updateSchedules[i].clear();
+
+ final Semaphore s = new Semaphore(0);
+
+ for(Map.Entry<ClusterUID, List<ClusterUpdateOperation>> entry : updates.entrySet()) {
+ ClusterUID key = entry.getKey();
+ int hash = key.hashCode() & (clusterUpdateThreads.length-1);
+ updateSchedules[hash].addAll(entry.getValue());
+ }
+
+ // final AtomicLong elapsed = new AtomicLong(0);
+ int acquireAmount = 0;
+ for(int i=0;i<CLUSTER_THREADS;i++) {
+ final List<ClusterUpdateOperation> ops = updateSchedules[i];
+ if (!ops.isEmpty()) {
+ acquireAmount++;
+ clusterUpdateThreads[i].execute(() -> {
+
+ //long st = System.nanoTime();
+ for(ClusterUpdateOperation op : ops) {
+ op.run();
+ }
+ s.release();
+ // long duration = System.nanoTime()-st;
+ // elapsed.addAndGet(duration);
+ // double dur = 1e-9*duration;
+ // if(dur > 0.05)
+ // System.err.println("duration=" + dur + "s. " + ops.size());
+ });
+ }
+ }
+
+ s.acquire(acquireAmount);
+
+ /*
+ * Here we are actively processing updates from client.
+ * Maintain necessary caching here.
+ */
+
+ clusters.streamLRU.acquireMutex();
+ try {
+ swapChunks();
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ clusters.streamLRU.releaseMutex();
+ }
+ clusters.csLRU.acquireMutex();
+ try {
+ swapCS();
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+ clusters.csLRU.releaseMutex();
+ }
+
+ TimeLogger.log("Performed updates");
+
+ }
+
+ } catch (Throwable t) {
+ t.printStackTrace();
+ } finally {
+ deathBarrier.release();
+ }
+
+ }
+
+ /*
+ * Mutex for streamLRU is assumed here
+ *
+ */
+ private void swapChunks() {
+
+ // Cache chunks during update operations
+ boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ while(written) {
+ written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ }
+
+ }
+
+ private void swapCS() {
+
+ // Cache chunks during update operations
+ boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ while(written) {
+ written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE);
+ }
+
+ }
+
+ public synchronized void committed() {
+
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if (!alive) {
+ System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last);
+// return;
+ }
+ if(last != null) last.commit();
+
+ }
+
+ public synchronized void schedule(ClusterUpdateOperation operation) {
+ if (!alive) {
+ System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation);
+// return;
+ }
+ clusters.streamLRU.acquireMutex();
+
+ try {
+
+ ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast();
+ if(last == null || last.isCommitted()) {
+ String id = "" + currentChangeSetId + "-" + nextChunkId++;
+ last = new ClusterStreamChunk(clusters, clusters.streamLRU, id);
+ operations.add(last);
+ }
+
+ String chunkId = last.getKey();
+ int chunkOffset = last.operations.size();
+ operation.scheduled(chunkId + "." + chunkOffset);
+
+ last.addOperation(operation);
+
+ swapChunks();
+
+ notifyAll();
+
+ } catch (Throwable t) {
+ throw new IllegalStateException(t);
+ } finally {
+
+ clusters.streamLRU.releaseMutex();
+
+ }
+
+ }
+
+ @Override
+ public void close() {
+ alive = false;
+ synchronized (this) {
+ notifyAll();
+ }
+ try {
+ deathBarrier.acquire();
+ } catch (InterruptedException e) {
+ }
+
+ for (ExecutorService executor : clusterUpdateThreads)
+ executor.shutdown();
+
+ for (int i = 0; i < clusterUpdateThreads.length; i++) {
+ try {
+ ExecutorService executor = clusterUpdateThreads[i];
+ executor.awaitTermination(500, TimeUnit.MILLISECONDS);
+ clusterUpdateThreads[i] = null;
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+}