import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.eclipse.core.runtime.Platform;
import org.simantics.databoard.Bindings;
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.DevelopmentKeys;
import org.simantics.db.Session;
import org.simantics.db.Statement;
import org.simantics.db.VirtualGraph;
+import org.simantics.db.common.ByteFileReader;
import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
import org.simantics.db.common.utils.Logger;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.request.ExternalRead;
import org.simantics.db.request.MultiRead;
import org.simantics.db.request.RequestFlags;
+import org.simantics.db.service.Bytes;
import org.simantics.layer0.Layer0;
import org.simantics.utils.DataContainer;
import org.simantics.utils.Development;
+import org.simantics.utils.FileUtils;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.datastructures.collections.CollectionUtils;
import org.simantics.utils.datastructures.disposable.AbstractDisposable;
final public QueryListening listening = new QueryListening(this);
QueryThread[] executors;
-
- public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
enum ThreadState {
}
+ final Scheduling scheduling;
+
public ThreadState[] threadStates;
final Object querySupportLock;
public void close() {
}
- public SessionTask getSubTask(ReadGraphImpl impl) {
- synchronized(querySupportLock) {
- int index = 0;
- while(index < freeScheduling.size()) {
- SessionTask task = freeScheduling.get(index);
- if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
- queueLength.decrementAndGet();
- return freeScheduling.remove(index);
- }
- index++;
- }
- }
- return null;
- }
/*
* We are running errands while waiting for requests to complete.
* We can only run work that is part of the current root request to avoid any deadlocks
*/
- public boolean performPending(ReadGraphImpl graph) {
- SessionTask task = getSubTask(graph);
+ public boolean performPending(ReadGraphImpl under) {
+ SessionTask task = scheduling.getSubTask(under);
if(task != null) {
- task.run(QueryProcessor.thread.get());
+ task.run(thread.get());
return true;
}
return false;
}
-
+
final public void scheduleNow(SessionTask request) {
- schedule(request, false);
+ SessionTask toExecute = scheduleOrReturnForExecution(request);
+ if(toExecute != null)
+ toExecute.run(thread.get());
}
- final public void scheduleLater(SessionTask request) {
- schedule(request, true);
- }
-
- AtomicInteger queueLength = new AtomicInteger(0);
-
- final public void schedule(SessionTask request, boolean late) {
+ final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
- int queueLengthEstimate = queueLength.get();
- if(!late && queueLengthEstimate > 80) {
- request.run(thread.get());
- return;
- }
-
- assert(request != null);
-
- synchronized(querySupportLock) {
+ return scheduling.scheduleOrReturnForExecution(request);
- if(BarrierTracing.BOOKKEEPING) {
- Exception current = new Exception();
- Exception previous = BarrierTracing.tasks.put(request, current);
- if(previous != null) {
- previous.printStackTrace();
- current.printStackTrace();
- }
- }
-
- if(late) {
- int pos = request.position - 1;
- if(pos < freeScheduling.size()) {
- freeScheduling.add(pos, request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- freeScheduling.addLast(request);
- queueLength.incrementAndGet();
- requests.release();
- }
- }
- else {
- if(request.getLevel() < 4) {
- if(freeScheduling.size() < 100) {
- freeScheduling.addFirst(request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- request.run(thread.get());
- }
- } else {
- if(freeScheduling.size() < 20) {
- freeScheduling.addFirst(request);
- queueLength.incrementAndGet();
- requests.release();
- } else {
- request.run(thread.get());
- }
- }
- }
-
-
- }
-
- }
+ }
final int THREADS;
public static abstract class SessionTask {
- public final ReadGraphImpl graph;
+ final protected ReadGraphImpl rootGraph;
private int counter = 0;
protected int position = 1;
private Exception trace;
- public SessionTask(ReadGraphImpl graph) {
- this.graph = graph;
- if(graph != null) graph.asyncBarrier.inc();
+ public SessionTask() {
+ this(null);
}
-
- public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
- if(r1 == null || r2 == null) return false;
- return r1.getTopLevelGraph() == r2.getTopLevelGraph();
- }
+
+ public SessionTask(ReadGraphImpl rootGraph) {
+ this.rootGraph = rootGraph;
+ }
+
+ public boolean isSubtask(ReadGraphImpl graph) {
+ return graph.isParent(rootGraph);
+ }
public abstract void run0(int thread);
trace = new Exception();
}
run0(thread);
- if(graph != null) graph.asyncBarrier.dec();
}
public boolean maybeReady() {
@Override
public String toString() {
- if(graph == null)
+ if(rootGraph == null)
return "SessionTask[no graph]";
else
- return "SessionTask[" + graph.parent + "]";
- }
-
- public int getLevel() {
- if(graph == null) return 0;
- else return graph.getLevel();
+ return "SessionTask[" + rootGraph.parent + "]";
}
}
}
- long waitingTime = 0;
-
- static int koss = 0;
- static int koss2 = 0;
-
public boolean resume(ReadGraphImpl graph) {
return executors[0].runSynchronized();
}
-
- //private WeakReference<GarbageTracker> garbageTracker;
-
- private class GarbageTracker {
-
- @Override
- protected void finalize() throws Throwable {
-
-// System.err.println("GarbageTracker");
-//
-// garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-
- super.finalize();
-
- }
-
- }
public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
throws DatabaseException {
- //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-
THREADS = threads;
THREAD_MASK = threads - 1;
+ scheduling = new Scheduling(requests);
+
querySupport = core;
cache = new QueryCache(core, threads);
session = querySupport.getSession();
querySupportLock = core.getLock();
executors = new QueryThread[THREADS];
-// queues = new ArrayList[THREADS];
-// threadLocks = new ReentrantLock[THREADS];
-// threadConditions = new Condition[THREADS];
threadStates = new ThreadState[THREADS];
-// ownTasks = new ArrayList[THREADS];
-// ownSyncTasks = new ArrayList[THREADS];
-// delayQueues = new ArrayList[THREADS * THREADS];
-
- // freeSchedule = new AtomicInteger(0);
-
-// for (int i = 0; i < THREADS * THREADS; i++) {
-// delayQueues[i] = new ArrayList<SessionTask>();
-// }
for (int i = 0; i < THREADS; i++) {
-
- // tasks[i] = new ArrayList<Runnable>();
-// ownTasks[i] = new ArrayList<SessionTask>();
-// ownSyncTasks[i] = new ArrayList<SessionTask>();
-// queues[i] = new ArrayList<SessionTask>();
-// threadLocks[i] = new ReentrantLock();
-// threadConditions[i] = threadLocks[i].newCondition();
- // limits[i] = false;
threadStates[i] = ThreadState.INIT;
-
}
for (int i = 0; i < THREADS; i++) {
}
+ public synchronized void save() throws IOException {
+
+ long start = System.nanoTime();
+
+ Collection<CacheEntryBase> caches = allCaches(new CacheCollectionResult()).toCollection();
+ Map<Long,List<CacheEntryBase>> cachesByCluster = new HashMap<>();
+ for(CacheEntryBase entry : caches) {
+ String clazz = entry.classId();
+ if(clazz == null)
+ continue;
+ long cluster = entry.cluster(this);
+ List<CacheEntryBase> queries = cachesByCluster.get(cluster);
+ if(queries == null) {
+ queries = new ArrayList<>();
+ cachesByCluster.put(cluster, queries);
+ }
+ queries.add(entry);
+ }
+
+ File workspace = Platform.getLocation().toFile();
+ File dir = new File(workspace, "queryData");
+ FileUtils.deleteAll(dir);
+
+ dir.mkdir();
+
+ for(Long cluster : cachesByCluster.keySet()) {
+
+ List<CacheEntryBase> queries = cachesByCluster.get(cluster);
+ QuerySerializer serializer = new QuerySerializer(this);
+ int count = 0;
+ int pos = serializer.writeUnknownSize();
+ for(CacheEntryBase entry : queries) {
+ String clazz = entry.classId();
+ if(clazz == null)
+ continue;
+ try {
+ entry.serialize(serializer);
+ count++;
+ } catch (IllegalStateException e) {
+ System.err.println(e.getMessage());
+ }
+ }
+ serializer.setUnknownSize(pos, count);
+
+ System.err.println(serializer.bytes().length + " bytes for cluster " + cluster);
+ FileUtils.writeFile(new File(dir, "" + cluster + ".queryData"), serializer.bytes());
+
+ }
+
+ long end = System.nanoTime();
+
+ System.err.println("saved queries in " + 1e-6*(end-start) + "ms.");
+
+ }
+
+ public void restore() throws IOException {
+
+ long start = System.nanoTime();
+
+ File workspace = Platform.getLocation().toFile();
+ File dir = new File(workspace, "queryData");
+ dir.mkdir();
+
+ for(File f : FileUtils.listFilesByExtension(dir, "queryData")) {
+ byte[] bytes = FileUtils.readFile(f);
+ QueryDeserializer qd = new QueryDeserializer(this, bytes);
+ qd.readHeaders();
+ qd.readQueries();
+ }
+
+ long end = System.nanoTime();
+
+ System.err.println("restored queries in " + 1e-6*(end-start) + "ms.");
+
+ }
+
boolean removeQuery(CacheEntry entry) {
// This entry has been removed before. No need to do anything here.
entry.prepareRecompute(querySupport);
ReadGraphImpl parentGraph = graph.forRecompute(entry);
-
+ parentGraph.asyncBarrier.inc();
query.recompute(parentGraph);
+ parentGraph.asyncBarrier.dec();
if(entry.isExcepted()) return ListenerEntry.NO_VALUE;
}
return result;
}
+
+ public long cluster(int resource) {
+ if(resource <= 0)
+ return 0;
+ return querySupport.getClusterId(resource);
+ }
public void assertDone() {
}
Exception callerException = null;
- public interface AsyncBarrier {
- public void inc();
- public void dec();
- // public void inc(String debug);
- // public void dec(String debug);
- }
+ public interface AsyncBarrier {
+ public void inc();
+ public void dec();
+ public void waitBarrier(Object request, ReadGraphImpl impl);
+ public boolean isBlocking();
+ }
// final public QueryProcessor processor;
// final public QuerySupport support;
return L0;
}
+ public Layer0 getL0() {
+ return L0;
+ }
+
public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
protected Integer initialValue() {
return -1;