package org.simantics.acorn; import java.io.Closeable; import java.util.ArrayList; import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import org.simantics.acorn.lru.ClusterStreamChunk; import org.simantics.acorn.lru.ClusterUpdateOperation; import org.simantics.db.service.ClusterUID; import org.simantics.utils.logging.TimeLogger; public class MainProgram implements Runnable, Closeable { private static final int CLUSTER_THREADS = 4; private static final int CHUNK_CACHE_SIZE = 100; private final GraphClientImpl2 client; private final ClusterManager clusters; private final ExecutorService[] clusterUpdateThreads; private final List[] updateSchedules; private int residentOperationBytes = 0; private long currentChangeSetId = -1; private int nextChunkId = 0; private boolean alive = true; private Semaphore deathBarrier = new Semaphore(0); final Semaphore mutex = new Semaphore(1); final LinkedList operations = new LinkedList<>(); static class ClusterThreadFactory implements ThreadFactory { final String name; final boolean daemon; public ClusterThreadFactory(String name, boolean daemon) { this.name = name; this.daemon = daemon; } @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, name); thread.setDaemon(daemon); return thread; } } public MainProgram(GraphClientImpl2 client, ClusterManager clusters) { this.client = client; this.clusters = clusters; this.clusterUpdateThreads = new ExecutorService[CLUSTER_THREADS]; this.updateSchedules = new ArrayList[CLUSTER_THREADS]; for(int i=0;i(); } } public void startTransaction(long id) { currentChangeSetId = id; nextChunkId = 0; } private static Comparator clusterComparator = new Comparator() { @Override public int compare(ClusterUID o1, ClusterUID o2) { return Long.compare(o1.second, o2.second); } }; @Override public void run() { try { mutex.acquire(); main: while(alive) { TreeMap> updates = new TreeMap>(clusterComparator); synchronized(MainProgram.this) { while(!operations.isEmpty() && updates.size() < 100) { ClusterStreamChunk chunk = operations.pollFirst(); for(int i=chunk.nextToProcess;i ops = updates.get(uid); if(ops == null) { ops = new ArrayList(); updates.put(uid, ops); } ops.add(o); } chunk.nextToProcess = chunk.operations.size(); if(!chunk.isCommitted()) { assert(operations.isEmpty()); operations.add(chunk); break; } } if(updates.isEmpty()) { try { long start = System.nanoTime(); mutex.release(); MainProgram.this.wait(5000); mutex.acquire(); if (!alive) break main; long duration = System.nanoTime()-start; if(duration > 4000000000L) { // Was this a time-out or a new stream request? if(operations.isEmpty()) { /* * We are idling here. * Flush all caches gradually */ // Write pending cs to disk boolean written = clusters.csLRU.swapForced(); while(written) { if(!updates.isEmpty()) break; written = clusters.csLRU.swapForced(); } // Write pending chunks to disk written = clusters.streamLRU.swapForced(); while(written) { if(!updates.isEmpty()) break; written = clusters.streamLRU.swapForced(); } // Write pending files to disk written = clusters.fileLRU.swapForced(); while(written) { if(!updates.isEmpty()) break; written = clusters.fileLRU.swapForced(); } // Write pending clusters to disk written = clusters.clusterLRU.swapForced(); while(written) { if(!updates.isEmpty()) break; written = clusters.clusterLRU.swapForced(); } client.tryMakeSnapshot(); } } } catch (InterruptedException e) { e.printStackTrace(); } } } // long sss = System.nanoTime(); for(int i=0;i> entry : updates.entrySet()) { ClusterUID key = entry.getKey(); int hash = key.hashCode() & (clusterUpdateThreads.length-1); updateSchedules[hash].addAll(entry.getValue()); } // final AtomicLong elapsed = new AtomicLong(0); int acquireAmount = 0; for(int i=0;i ops = updateSchedules[i]; if (!ops.isEmpty()) { acquireAmount++; clusterUpdateThreads[i].execute(() -> { //long st = System.nanoTime(); for(ClusterUpdateOperation op : ops) { op.run(); } s.release(); // long duration = System.nanoTime()-st; // elapsed.addAndGet(duration); // double dur = 1e-9*duration; // if(dur > 0.05) // System.err.println("duration=" + dur + "s. " + ops.size()); }); } } s.acquire(acquireAmount); /* * Here we are actively processing updates from client. * Maintain necessary caching here. */ clusters.streamLRU.acquireMutex(); try { swapChunks(); } catch (Throwable t) { throw new IllegalStateException(t); } finally { clusters.streamLRU.releaseMutex(); } clusters.csLRU.acquireMutex(); try { swapCS(); } catch (Throwable t) { throw new IllegalStateException(t); } finally { clusters.csLRU.releaseMutex(); } TimeLogger.log("Performed updates"); } } catch (Throwable t) { t.printStackTrace(); } finally { deathBarrier.release(); } } /* * Mutex for streamLRU is assumed here * */ private void swapChunks() { // Cache chunks during update operations boolean written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); while(written) { written = clusters.streamLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); } } private void swapCS() { // Cache chunks during update operations boolean written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); while(written) { written = clusters.csLRU.swap(Integer.MAX_VALUE, CHUNK_CACHE_SIZE); } } public synchronized void committed() { ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); if (!alive) { System.err.println("Trying to commit operation after MainProgram is closed! Operation is " + last); // return; } if(last != null) last.commit(); } public synchronized void schedule(ClusterUpdateOperation operation) { if (!alive) { System.err.println("Trying to schedule operation after MainProgram is closed! Operation is " + operation); // return; } clusters.streamLRU.acquireMutex(); try { ClusterStreamChunk last = operations.isEmpty() ? null : operations.getLast(); if(last == null || last.isCommitted()) { String id = "" + currentChangeSetId + "-" + nextChunkId++; last = new ClusterStreamChunk(clusters, clusters.streamLRU, id); operations.add(last); } String chunkId = last.getKey(); int chunkOffset = last.operations.size(); operation.scheduled(chunkId + "." + chunkOffset); last.addOperation(operation); swapChunks(); notifyAll(); } catch (Throwable t) { throw new IllegalStateException(t); } finally { clusters.streamLRU.releaseMutex(); } } @Override public void close() { alive = false; synchronized (this) { notifyAll(); } try { deathBarrier.acquire(); } catch (InterruptedException e) { } for (ExecutorService executor : clusterUpdateThreads) executor.shutdown(); for (int i = 0; i < clusterUpdateThreads.length; i++) { try { ExecutorService executor = clusterUpdateThreads[i]; executor.awaitTermination(500, TimeUnit.MILLISECONDS); clusterUpdateThreads[i] = null; } catch (InterruptedException e) { e.printStackTrace(); } } } }