private TObjectIntHashMap<NodeContext> references = new TObjectIntHashMap<NodeContext>();
@Override
- public void incRef(NodeContext context) {
+ synchronized public void incRef(NodeContext context) {
int exist = references.get(context);
references.put(context, exist+1);
}
@Override
- public void decRef(NodeContext context) {
+ synchronized public void decRef(NodeContext context) {
int exist = references.get(context);
references.put(context, exist-1);
if(exist == 1) {
package org.simantics.db.common.request;
import org.simantics.db.AsyncReadGraph;
+import org.simantics.db.exception.DatabaseException;
import org.simantics.db.procedure.AsyncProcedure;
import org.simantics.db.request.AsyncRead;
@Override
final public void perform(AsyncReadGraph graph, AsyncProcedure<Object> procedure) {
- run(graph);
- procedure.execute(graph, null);
+ try {
+ run(graph);
+ procedure.execute(graph, null);
+ } catch (DatabaseException e) {
+ procedure.exception(graph, e);
+ }
}
@Override
return hashCode();
}
- public abstract void run(AsyncReadGraph graph);
+ public abstract void run(AsyncReadGraph graph) throws DatabaseException;
}
this.parameter2 = parameter2;
}
+ @Override
+ public String toString() {
+ return getClass().getName() + "[" + parameter + ", " + parameter2 + "]";
+ }
+
}
@Override
public void request(AsyncRequestProcessor processor, AsyncProcedure<R> procedure) {
- processor.asyncRequest(this, procedure);
+ processor.asyncRequest(this, procedure);
}
@Override
public void request(AsyncRequestProcessor processor, Procedure<R> procedure) {
- processor.asyncRequest(this, procedure);
+ processor.asyncRequest(this, procedure);
}
@Override
public void request(AsyncRequestProcessor processor, SyncProcedure<R> procedure) {
- processor.asyncRequest(this, procedure);
+ processor.asyncRequest(this, procedure);
}
@Override
public void request(AsyncRequestProcessor processor, AsyncListener<R> procedure) {
- processor.asyncRequest(this, procedure);
+ processor.asyncRequest(this, procedure);
}
@Override
public void request(AsyncRequestProcessor processor, Listener<R> procedure) {
- processor.asyncRequest(this, procedure);
+ processor.asyncRequest(this, procedure);
}
@Override
public void request(AsyncRequestProcessor processor, SyncListener<R> procedure) {
- processor.asyncRequest(this, procedure);
+ processor.asyncRequest(this, procedure);
}
@Override
public R request(RequestProcessor processor) throws DatabaseException {
- return processor.syncRequest(this);
+ return processor.syncRequest(this);
+ }
+
+ @Override
+ public String toString() {
+ return getClass().getName() + "[" + parameter + "]";
}
}
import org.simantics.utils.Development;
import org.simantics.utils.datastructures.Pair;
import org.simantics.utils.datastructures.collections.CollectionUtils;
+import org.simantics.utils.threads.logger.ITask;
+import org.simantics.utils.threads.logger.ThreadLogger;
import org.slf4j.LoggerFactory;
import gnu.trove.map.hash.TObjectIntHashMap;
@Override
public <T> T syncRequest(final Read<T> request) throws DatabaseException {
assert (request != null);
- return (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
+
+ ITask task = ThreadLogger.task(request);
+ T result = (T)QueryCache.runnerReadEntry(this, request, parent, null, null, true);
+ task.finish();
+ return result;
+
}
@Override
assert (request != null);
+ ITask task = ThreadLogger.task(request);
ListenerBase listener = procedure != null ? getListenerBase(procedure) : null;
-
- return QueryCache.resultReadEntry(this, request, parent, listener, procedure);
+ T result = QueryCache.resultReadEntry(this, request, parent, listener, procedure);
+ task.finish();
+ return result;
}
assert (request != null);
+ ITask task = ThreadLogger.task(request);
ListenerBase listener = getListenerBase(procedure);
-
-// BlockingAsyncProcedure<T> ap = new BlockingAsyncProcedure<>(this, procedure, request);
- return (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
-// return ap.get();
+ T result = (T)QueryCache.runnerAsyncReadEntry(this, request, parent, listener, procedure, true);
+ task.finish();
+ return result;
}
assert (request != null);
assert (procedure != null);
- processor.schedule(new SessionTask(this) {
+ processor.scheduleNow(new SessionTask(this) {
@Override
public void run0(int thread) {
assert (request != null);
assert (procedure != null);
- processor.schedule(new SessionTask(this) {
+ ITask task = ThreadLogger.task(request);
+
+ processor.scheduleNow(new SessionTask(this) {
@Override
public void run0(int thread) {
try {
final ListenerBase listener = getListenerBase(procedure);
- QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, procedure, false);
+ QueryCache.runnerAsyncReadEntry(ReadGraphImpl.this, request, parent, listener, new AsyncProcedure<T>() {
+
+ @Override
+ public void execute(AsyncReadGraph graph, T result) {
+ task.finish();
+ procedure.execute(graph, result);
+ }
+
+ @Override
+ public void exception(AsyncReadGraph graph, Throwable throwable) {
+ task.finish();
+ procedure.exception(graph, throwable);
+ }
+
+ }, false);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
public boolean performPending() {
return processor.performPending(this);
}
-
+
public Set<ReadGraphImpl> ancestorSet() {
HashSet<ReadGraphImpl> result = new HashSet<>();
ReadGraphImpl g = this;
}
return result;
}
+
+ public int getLevel() {
+ return getLevelStatic(this);
+ }
+
+ private static int getLevelStatic(ReadGraphImpl impl) {
+ if(impl == null) return 0;
+ else return 1 + getLevelStatic(impl.parentGraph);
+ }
+
+ public ReadGraphImpl getTopLevelGraph() {
+ return getTopLevelGraphStatic(this);
+ }
+
+ private static ReadGraphImpl getTopLevelGraphStatic(ReadGraphImpl impl) {
+ if(impl.parentGraph == null) return impl;
+ else return getTopLevelGraphStatic(impl.parentGraph);
+ }
}
DatabaseException exception;
public AsyncTask(ReadGraphImpl graph) {
+ this(graph, 1);
+ }
+
+ public AsyncTask(ReadGraphImpl graph, int pos) {
super(graph);
+ this.position = pos;
+ if(this.position < 1024)
+ this.position *= 2;
}
@Override
}
throw new IllegalStateException("Eternal loop in queries.");
}
- graph.processor.schedule(new AsyncTask(graph));
+ graph.processor.scheduleLater(new AsyncTask(graph, position));
}
}
+ @Override
+ public boolean maybeReady() {
+ return proc.isDone();
+ }
+
}
try {
if(needsToBlock) task.run(0);
else if (proc.isDone()) task.run(0);
else {
- graph.processor.schedule(task);
+ graph.processor.scheduleLater(task);
return null;
}
}
ReadEntry entry = (ReadEntry)cache.getOrCreateReadEntry(graph, r, needsToBlock);
if(entry == null) {
- graph.processor.schedule(new SessionTask(graph) {
+ graph.processor.scheduleNow(new SessionTask(graph) {
@Override
public void run0(int thread) {
try {
}
AsyncReadEntry entry = (AsyncReadEntry)cache.getOrCreateAsyncReadEntry(graph, r, needsToBlock);
if(entry == null) {
- graph.processor.schedule(new SessionTask(graph) {
+ graph.processor.scheduleNow(new SessionTask(graph) {
@Override
public void run0(int thread) {
try {
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.simantics.databoard.Bindings;
private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
private boolean firingListeners = false;
final THashMap<CacheEntry, ArrayList<ListenerEntry>> listeners = new THashMap<CacheEntry, ArrayList<ListenerEntry>>(10, 0.75f);
- private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
+ private BlockingQueue<Runnable> tasks = new ArrayBlockingQueue<Runnable>(2048);
private Map<ListenerBase,ListenerEntry> addedEntries = new HashMap<>();
QueryListening(QueryProcessor processor) {
public void sync() {
Semaphore s = new Semaphore(0);
- tasks.add(() -> {
- s.release();
- });
try {
+ tasks.put(() -> {
+ s.release();
+ });
s.acquire();
} catch (Throwable t) {
LOGGER.error("Error while waiting for query dependency management", t);
void registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
- tasks.offer(() -> {
+ try {
+ tasks.put(() -> {
- if (parent != null && !inferred) {
- try {
- if(!child.isImmutable(graph))
- child.addParent(parent);
- } catch (DatabaseException e) {
- LOGGER.error("Error while registering query dependencies", e);
- }
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
- System.out.println(child + " -> " + parent);
+ if (parent != null && !inferred) {
+ try {
+ if(!child.isImmutable(graph))
+ child.addParent(parent);
+ } catch (DatabaseException e) {
+ LOGGER.error("Error while registering query dependencies", e);
+ }
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_DEPENDENCIES, Bindings.BOOLEAN)) {
+ System.out.println(child + " -> " + parent);
+ }
}
}
- }
- if (listener != null)
- registerListener(child, listener, procedure);
+ if (listener != null)
+ registerListener(child, listener, procedure);
- });
+ });
+ } catch (InterruptedException e) {
+ LOGGER.error("Error while registering dependencies", e);
+ }
}
final public Session session;
final public ResourceSupport resourceSupport;
+ final public Semaphore requests = new Semaphore(1);
+
final public QueryListening listening = new QueryListening(this);
QueryThread[] executors;
-
-// public ArrayList<SessionTask>[] queues;
public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
}
public ThreadState[] threadStates;
-// public ReentrantLock[] threadLocks;
-// public Condition[] threadConditions;
-
- //public ArrayList<SessionTask>[] ownTasks;
-
- //public ArrayList<SessionTask>[] ownSyncTasks;
-
- //ArrayList<SessionTask>[] delayQueues;
final Object querySupportLock;
public void close() {
}
- public SessionTask getOwnTask(ReadGraphImpl impl) {
- Set<ReadGraphImpl> ancestors = impl.ancestorSet();
- synchronized(querySupportLock) {
- int index = 0;
- while(index < freeScheduling.size()) {
- SessionTask task = freeScheduling.get(index);
- if(task.hasCommonParent(ancestors)) {
- return freeScheduling.remove(index);
- }
- index++;
- }
- }
- return null;
- }
-
public SessionTask getSubTask(ReadGraphImpl impl) {
- Set<ReadGraphImpl> onlyThis = Collections.singleton(impl);
synchronized(querySupportLock) {
int index = 0;
while(index < freeScheduling.size()) {
SessionTask task = freeScheduling.get(index);
- if(task.hasCommonParent(onlyThis)) {
+ if(task.hasCommonParent(task.graph, impl) && task.maybeReady()) {
+ queueLength.decrementAndGet();
return freeScheduling.remove(index);
}
index++;
return null;
}
- public boolean performPending(ReadGraphImpl graph) {
- SessionTask task = getOwnTask(graph);
+ /*
+ * We are running errands while waiting for requests to complete.
+ * We can only run work that is part of the current root request to avoid any deadlocks
+ */
+ public boolean performPending(ReadGraphImpl graph) {
+ SessionTask task = getSubTask(graph);
if(task != null) {
task.run(QueryProcessor.thread.get());
return true;
- } else {
- return false;
}
+ return false;
}
-// final public void scheduleOwn(int caller, SessionTask request) {
-// ownTasks[caller].add(request);
-// }
-
- final public void schedule(SessionTask request) {
-
- //int performer = request.thread;
-
-// if(DebugPolicy.SCHEDULE)
-// System.out.println("schedule " + request + " " + " -> " + performer);
+ final public void scheduleNow(SessionTask request) {
+ schedule(request, false);
+ }
- //assert(performer >= 0);
+ final public void scheduleLater(SessionTask request) {
+ schedule(request, true);
+ }
+ AtomicInteger queueLength = new AtomicInteger(0);
+
+ final public void schedule(SessionTask request, boolean late) {
+
+ int queueLengthEstimate = queueLength.get();
+ if(!late && queueLengthEstimate > 80) {
+ request.run(thread.get());
+ return;
+ }
+
assert(request != null);
+
+ synchronized(querySupportLock) {
-// if(caller == performer) {
-// request.run(caller);
-// } else {
-
-// if(performer == THREADS) {
-
- synchronized(querySupportLock) {
-
- if(BarrierTracing.BOOKKEEPING) {
- Exception current = new Exception();
- Exception previous = BarrierTracing.tasks.put(request, current);
- if(previous != null) {
- previous.printStackTrace();
- current.printStackTrace();
- }
- }
-
- freeScheduling.add(request);
-
- querySupportLock.notifyAll();
+ if(BarrierTracing.BOOKKEEPING) {
+ Exception current = new Exception();
+ Exception previous = BarrierTracing.tasks.put(request, current);
+ if(previous != null) {
+ previous.printStackTrace();
+ current.printStackTrace();
+ }
+ }
- }
+ if(late) {
+ int pos = request.position - 1;
+ if(pos < freeScheduling.size()) {
+ freeScheduling.add(pos, request);
+ queueLength.incrementAndGet();
+ requests.release();
+ } else {
+ freeScheduling.addLast(request);
+ queueLength.incrementAndGet();
+ requests.release();
+ }
+ }
+ else {
+ if(request.getLevel() < 4) {
+ if(freeScheduling.size() < 100) {
+ freeScheduling.addFirst(request);
+ queueLength.incrementAndGet();
+ requests.release();
+ } else {
+ request.run(thread.get());
+ }
+ } else {
+ if(freeScheduling.size() < 20) {
+ freeScheduling.addFirst(request);
+ queueLength.incrementAndGet();
+ requests.release();
+ } else {
+ request.run(thread.get());
+ }
+ }
+ }
- return;
-
-// }
-//
-// ReentrantLock queueLock = threadLocks[performer];
-// queueLock.lock();
-// queues[performer].add(request);
-// // This thread could have been sleeping
-// if(queues[performer].size() == 1) {
-// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-// threadConditions[performer].signalAll();
-// }
-// queueLock.unlock();
-// }
+
+ }
}
public static abstract class SessionTask {
public final ReadGraphImpl graph;
- private Set<ReadGraphImpl> ancestors;
private int counter = 0;
+ protected int position = 1;
private Exception trace;
public SessionTask(ReadGraphImpl graph) {
if(graph != null) graph.asyncBarrier.inc();
}
- public boolean hasCommonParent(Set<ReadGraphImpl> otherAncestors) {
- if(graph == null) return false;
- if(ancestors == null) ancestors = graph.ancestorSet();
- return !Collections.disjoint(ancestors, otherAncestors);
- }
+ public static boolean hasCommonParent(ReadGraphImpl r1, ReadGraphImpl r2) {
+ if(r1 == null || r2 == null) return false;
+ return r1.getTopLevelGraph() == r2.getTopLevelGraph();
+ }
- public abstract void run0(int thread);
+ public abstract void run0(int thread);
public final void run(int thread) {
if(counter++ > 0) {
run0(thread);
if(graph != null) graph.asyncBarrier.dec();
}
+
+ public boolean maybeReady() {
+ return true;
+ }
@Override
public String toString() {
- return "SessionTask[" + graph.parent + "]";
+ if(graph == null)
+ return "SessionTask[no graph]";
+ else
+ return "SessionTask[" + graph.parent + "]";
+ }
+
+ public int getLevel() {
+ if(graph == null) return 0;
+ else return graph.getLevel();
}
}
@Override
protected void doDispose() {
+ requests.release(Integer.MAX_VALUE / 2);
+
for(int index = 0; index < THREADS; index++) {
executors[index].dispose();
}
private Session session;
private QuerySupport querySupport;
private final QueryProcessor processor;
+ private final Semaphore requests;
final private ArrayList<SessionTask> tasks = new ArrayList<SessionTask>();
// final private ArrayList<SessionTask> own;
sleepers = processor.sleepers;
querySupport = processor.querySupport;
threadStates = processor.threadStates;
+ requests = processor.requests;
// delayQueues = processor.delayQueues;
// executors = processor.executors;
// threadLocks = processor.threadLocks;
private boolean pumpTask() {
if(!processor.freeScheduling.isEmpty()) {
tasks.add(processor.freeScheduling.removeFirst());
+ processor.queueLength.decrementAndGet();
return true;
}
return false;
threadStates[index] = ThreadState.SLEEP;
- synchronized (querySupportLock) {
- querySupportLock.wait(100);
-
- }
-
-// boolean woken = condition.await(10, TimeUnit.MILLISECONDS);
-// if(!woken) {
-// synchronized (querySupportLock) {
-// if(!processor.freeScheduling.isEmpty())
-// System.err.println("some tasks are available!");
-// }
-// }
+ requests.acquire();
sleepers.decrementAndGet();
CollectionSupport coll = graph.getService(CollectionSupport.class);
THashSet<Resource> visited = new THashSet<>();
- List<Resource> rec = findRec(graph, index, filter, visited);
+ List<Resource> rec_ = findRec(graph, index, filter, visited);
+ // We must not modify rec_!
+ List<Resource> rec = rec_;
for(Resource global : Layer0Utils.listGlobalOntologies(graph)) {
if(!visited.add(global)) continue;
List<Resource> rs = graph.syncRequest(new QueryIndex(global, type, filter), TransientCacheListener.<List<Resource>>instance());
- if(rec.isEmpty() && !rs.isEmpty()) {
- // TODO: rec could be an immutable empty list
- rec = new ArrayList<Resource>();
+ if(!rs.isEmpty()) {
+ if(rec == rec_)
+ rec = new ArrayList<>(rec);
+ rec.addAll(rs);
}
- rec.addAll(rs);
}
Collection<Resource> result = coll.asSortedList(rec);
return result;
@Override
public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
- final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+ final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
assert (request != null);
delayedWriteState = null;
- ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+ ITask task2 = ThreadLogger.task("DelayedWriteCommit");
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
assert (request != null);
assert (procedure != null);
- //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ITask task = ThreadLogger.task(request);
+
ListenerBase listener = getListenerBase(procedure);
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
try {
-// newGraph.state.barrier.inc();
-
T t = request.perform(newGraph);
try {
}
}
-
-// newGraph.state.barrier.dec();
-// newGraph.waitAsync(request);
+
+ task.finish();
}
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ITask task = ThreadLogger.task(request);
+
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
try {
try {
QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
- //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
- //newGraph.processor.query(newGraph, request, null, procedure, listener);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
} else {
-// final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-// procedure, "request");
-
- BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request) {
+
+ public void execute(AsyncReadGraph graph_, T result) {
+ task.finish();
+ super.execute(graph_, result);
+ }
+
+ public void exception(AsyncReadGraph graph_, Throwable t) {
+ task.finish();
+ super.exception(graph_, t);
+ }
+
+ };
try {
public synchronized void startRead(int thread, final SessionRead task) {
- session.queryProvider2.schedule(new SessionTask(null) {
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
@Override
public void run0(int thread) {
public synchronized void startReadUpdate(int thread) {
- session.queryProvider2.schedule(new SessionTask(null) {
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
@Override
public void run0(int thread) {
public synchronized void startWrite(int thread, final SessionTask task) {
- session.queryProvider2.schedule(new SessionTask(null) {
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
@Override
public void run0(int thread) {
public synchronized void startWriteUpdate(int thread) {
- session.queryProvider2.schedule(new SessionTask(null) {
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
@Override
public void run0(int thread) {
if (!reads.isEmpty()) {
final SessionRead read = reads.poll();
- session.queryProvider2.schedule(new SessionTask(null) {
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
@Override
public void run0(int thread) {
assert(State.INIT != state);
if(State.READ == state) {
- session.queryProvider2.schedule(new SessionTask(null) {
+ session.queryProvider2.scheduleNow(new SessionTask(null) {
@Override
public void run0(int thread) {
// keep their order the same as in the ordered set.
final int elementIndex = index.getAndIncrement();
result.elements.add(component);
-
- graph.forTypes(component, new ProcedureAdapter<Set<Resource>>() {
+
+ graph.asyncRequest(new org.simantics.db.common.primitiverequest.Types(component), new ProcedureAdapter<Set<Resource>>() {
@Override
public void execute(Set<Resource> types) {
this.removedRouteGraphConnections.clear();
}
- void processNodes(ReadGraph graph) throws DatabaseException {
+ void processNodes(AsyncReadGraph graph) throws DatabaseException {
for (Map.Entry<Resource, Change> entry : changes.elements.entrySet()) {
}
};
- graph.syncRequest(new ConnectionRequest(canvas, diagram, element, errorHandler, loadListener), new AsyncProcedure<IElement>() {
+ graph.asyncRequest(new ConnectionRequest(canvas, diagram, element, errorHandler, loadListener), new AsyncProcedure<IElement>() {
@Override
public void execute(AsyncReadGraph graph, final IElement e) {
};
//System.out.println("NODE REQUEST: " + element);
- graph.syncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure<IElement>() {
+ graph.asyncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure<IElement>() {
@Override
public void execute(AsyncReadGraph graph, IElement e) {
}
return assertMappedConnection(connection);
}
- void processBranchPoints(ReadGraph graph) throws DatabaseException {
+ void processBranchPoints(AsyncReadGraph graph) throws DatabaseException {
for (Map.Entry<Resource, Change> entry : changes.branchPoints.entrySet()) {
final Resource element = entry.getKey();
IElement mappedElement = getMappedElement(element);
if (mappedElement == null) {
if (DebugPolicy.DEBUG_NODE_LOAD)
- graph.syncRequest(new ReadRequest() {
+ graph.asyncRequest(new ReadRequest() {
@Override
public void run(ReadGraph graph) throws DatabaseException {
System.out.println(" EXTERNALLY ADDED BRANCH POINT: "
}
};
- graph.syncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure<IElement>() {
+ graph.asyncRequest(new NodeRequest(canvas, diagram, element, loadListener), new AsyncProcedure<IElement>() {
@Override
public void execute(AsyncReadGraph graph, IElement e) {
}
}
}
- void processConnectionSegments(ReadGraph graph) throws DatabaseException {
+ void processConnectionSegments(AsyncReadGraph graph) throws DatabaseException {
ConnectionSegmentAdapter adapter = connectionSegmentAdapter;
for (Map.Entry<EdgeResource, Change> entry : changes.connectionSegments.entrySet()) {
IElement mappedElement = getMappedElement(seg);
if (mappedElement == null) {
if (DebugPolicy.DEBUG_EDGE_LOAD)
- graph.syncRequest(new ReadRequest() {
+ graph.asyncRequest(new ReadRequest() {
@Override
public void run(ReadGraph graph) throws DatabaseException {
System.out.println(" EXTERNALLY ADDED CONNECTION SEGMENT: " + seg.toString()
}
});
- graph.syncRequest(new EdgeRequest(GraphToDiagramSynchronizer.this, canvas, errorHandler, canvasListenerSupport, diagram, adapter, seg), new AsyncProcedure<IElement>() {
+ graph.asyncRequest(new EdgeRequest(GraphToDiagramSynchronizer.this, canvas, errorHandler, canvasListenerSupport, diagram, adapter, seg), new AsyncProcedure<IElement>() {
@Override
public void execute(AsyncReadGraph graph, IElement e) {
if (DebugPolicy.DEBUG_EDGE_LOAD)
case REMOVED: {
final IElement e = getMappedElement(seg);
if (DebugPolicy.DEBUG_EDGE_LOAD)
- graph.syncRequest(new ReadRequest() {
+ graph.asyncRequest(new ReadRequest() {
@Override
public void run(ReadGraph graph) throws DatabaseException {
System.out.println(" EXTERNALLY REMOVED CONNECTION SEGMENT: " + seg.toString() + " - "
if (changes.isEmpty())
return;
+ ITask threadLog = ThreadLogger.task("processNodes");
+
// NOTE: This order is important.
Object task = Timing.BEGIN("processNodesConnections");
//System.out.println("---- PROCESS NODES & CONNECTIONS BEGIN");
if (!changes.elements.isEmpty()) {
- graph.syncRequest(new ReadRequest() {
+ graph.syncRequest(new AsyncReadRequest() {
@Override
- public void run(ReadGraph graph) throws DatabaseException {
+ public void run(AsyncReadGraph graph) throws DatabaseException {
processNodes(graph);
}
@Override
}
//System.out.println("---- PROCESS NODES & CONNECTIONS END");
+ threadLog.finish();
+
+ threadLog = ThreadLogger.task("processConnections");
+
processConnections();
+ threadLog.finish();
+
+ threadLog = ThreadLogger.task("processBranchPoints");
+
//System.out.println("---- PROCESS BRANCH POINTS BEGIN");
if (!changes.branchPoints.isEmpty()) {
- graph.syncRequest(new ReadRequest() {
+ graph.syncRequest(new AsyncReadRequest() {
@Override
- public void run(ReadGraph graph) throws DatabaseException {
+ public void run(AsyncReadGraph graph) throws DatabaseException {
processBranchPoints(graph);
}
@Override
}
//System.out.println("---- PROCESS BRANCH POINTS END");
+ threadLog.finish();
+
Timing.END(task);
+
+ threadLog = ThreadLogger.task("processConnectionSegments");
+
task = Timing.BEGIN("processConnectionSegments");
//System.out.println("---- PROCESS CONNECTION SEGMENTS BEGIN");
if (!changes.connectionSegments.isEmpty()) {
- graph.syncRequest(new ReadRequest() {
+ graph.syncRequest(new AsyncReadRequest() {
@Override
- public void run(ReadGraph graph) throws DatabaseException {
+ public void run(AsyncReadGraph graph) throws DatabaseException {
processConnectionSegments(graph);
}
@Override
}
//System.out.println("---- PROCESS CONNECTION SEGMENTS END");
+ threadLog.finish();
+
Timing.END(task);
+ threadLog = ThreadLogger.task("processRouteGraphConnections");
+
task = Timing.BEGIN("processRouteGraphConnections");
if (!changes.routeGraphConnections.isEmpty()) {
graph.syncRequest(new ReadRequest() {
}
Timing.END(task);
+ threadLog.finish();
+
//System.out.println("---- AFTER LOADING");
//for (IElement e : addedElements)
// System.out.println(" ADDED ELEMENT: " + e);
// System.out.println(" ADDED BRANCH POINTS: " + e);
task = Timing.BEGIN("executeDeferredLoaders");
+ threadLog = ThreadLogger.task("executeDeferredLoaders");
+
executeDeferredLoaders(graph);
+
+ threadLog.finish();
+
Timing.END(task);
}
public class ThreadLogVisualizer {
// Do not show tasks shorter than 5ms
- final public static long DISCARD_LIMIT = 0;
+ final public static long DISCARD_LIMIT = 2 * 1000 * 1000;
// 1s columns
final public static long COLUMN_WIDTH = 1000000000;
+ final int onlyOneThread = -1;
+
+ final String colors[] = {
+ "#53c0a7",
+ "#ca49a1",
+ "#64b74e",
+ "#a159ca",
+ "#b6b345",
+ "#656bc5",
+ "#da943b",
+ "#5e99d3",
+ "#d1592b",
+ "#418a53",
+ "#e16182",
+ "#777d34",
+ "#ca89ca",
+ "#b7754c",
+ "#9e4b6b",
+ "#cb4347"
+ };
+
class Task implements Comparable<Task> {
String name;
long beginTime;
long endTime;
long threadId;
+ long combined = 0;
public Task(String name, long threadId, long beginTime, long endTime) {
+ if(name.length() > 100) name = name.substring(0, 99);
this.name = name;
this.threadId = threadId;
this.beginTime = beginTime;
ArrayList<Task> tasks = new ArrayList<Task>();
+ private boolean acceptThread(long threadId) {
+ if(onlyOneThread == -1) return true;
+ else return (threadId&15) == onlyOneThread;
+ }
+
+ private Map<Long, Task> compositeTasks = new HashMap<>();
+
public void read(DataInput input) {
try {
while(true) {
try {
- String taskName = input.readUTF();
+ String taskName = input.readUTF();
long threadId = input.readLong();
long beginTime = input.readLong();
long endTime = input.readLong();
- if((endTime-beginTime) > DISCARD_LIMIT)
- tasks.add(new Task(taskName, threadId, beginTime, endTime));
+ if(!acceptThread(threadId)) continue;
+ if((endTime-beginTime) > DISCARD_LIMIT) {
+ tasks.add(new Task(taskName, threadId, beginTime, endTime));
+ Task t = compositeTasks.remove(threadId);
+ if(t != null) {
+ if((t.endTime-t.beginTime) > DISCARD_LIMIT) {
+ tasks.add(new Task(t.combined + " small tasks", t.threadId, t.beginTime, t.endTime));
+ }
+ }
+ } else {
+ Task t = compositeTasks.get(threadId);
+ if(t == null) {
+ t = new Task("", threadId, beginTime, endTime);
+ compositeTasks.put(threadId, t);
+ }
+ if(beginTime - t.endTime > DISCARD_LIMIT) {
+ tasks.add(new Task(t.combined + " small tasks", t.threadId, t.beginTime, t.endTime));
+ t = new Task("", threadId, beginTime, endTime);
+ compositeTasks.put(threadId, t);
+ }
+ t.endTime = endTime;
+ t.combined++;
+ if((t.endTime-t.beginTime) > DISCARD_LIMIT) {
+ tasks.add(new Task(t.combined + " small tasks", t.threadId, t.beginTime, t.endTime));
+ compositeTasks.remove(threadId);
+ }
+ }
} catch(EOFException e) {
break;
}
(task.beginTime-minTime)*timeScale,
(r+1)*rowHeight);
s.printf(locale,
- "<rect x=\"%f\" y=\"%f\" width=\"%f\" height=\"%f\" fill=\"green\"/>\n",
+ "<rect x=\"%f\" y=\"%f\" width=\"%f\" height=\"%f\" fill=\"" + colors[(int)task.threadId & 15] + "\"/>\n",
(task.beginTime-minTime)*timeScale,
r*rowHeight,
(task.endTime-task.beginTime)*timeScale,
rowHeight);
}
for(Task task : lane.tasks) {
+ int time = (int)(1e-6 * (task.endTime-task.beginTime));
s.printf(locale,
"<text x=\"%f\" y=\"%f\">%s</text>\n",
(task.endTime-minTime)*timeScale,
(r+0.8)*rowHeight,
- task.name);
+ Integer.toString(time) + "ms: " + task.name);
}
}
s.println("</svg>");
(task.beginTime-minTime)*timeScale,
(r+1)*rowHeight);
s.printf(locale,
- "<rect x=\"%f\" y=\"%f\" width=\"%f\" height=\"%f\" fill=\"green\"/>\n",
+ "<rect x=\"%f\" y=\"%f\" width=\"%f\" height=\"%f\" fill=\"" + colors[(int)task.threadId & 15] + "\"/>\n",
(task.beginTime-minTime)*timeScale,
r*rowHeight,
(task.endTime-task.beginTime)*timeScale,
(task.beginTime-minTime)*timeScale,
(r+1)*rowHeight);
s.printf(locale,
- "<rect x=\"%f\" y=\"%f\" width=\"%f\" height=\"%f\" fill=\"green\"/>\n",
+ "<rect x=\"%f\" y=\"%f\" width=\"%f\" height=\"%f\" fill=\"" + colors[(int)task.threadId & 15] + "\"/>\n",
(task.beginTime-minTime)*timeScale,
r*rowHeight,
(task.endTime-task.beginTime)*timeScale,
public class ThreadLogger implements IThreadLogger {
- public static String LOG_FILE = "d:\\threads.log";
+ final private static ITask DUMMY = () -> {};
+
+ public static String LOG_FILE = "threads.log";
public static boolean LOG = false;
synchronized (loggerCreationLock) {
if(logger == null)
logger = new ThreadLogger();
- }
+ }
}
return logger;
}
public ITask begin(String taskName) {
return new Task(taskName);
}
+
+ final public static ITask task(Object taskName) {
+ if(LOG)
+ return getInstance().begin(taskName.toString());
+ else
+ return DUMMY;
+ }
}