X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=22d0f316545762d4937b384bea9fd648e8bdd441;hb=b402468a5f6a74cce769eedf1d9f9b531b123842;hp=62b7e382522e78cf726c589507824ec7df95242d;hpb=e88be95edf1f80781646cfdf717ec1b663264179;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 62b7e3825..22d0f3165 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -143,6 +143,7 @@ import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.DebugSupport; import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.EventSupport; import org.simantics.db.service.GraphChangeListenerSupport; import org.simantics.db.service.InitSupport; import org.simantics.db.service.LifecycleSupport; @@ -166,6 +167,7 @@ import org.simantics.utils.DataContainer; import org.simantics.utils.Development; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; +import org.slf4j.LoggerFactory; import gnu.trove.procedure.TLongProcedure; import gnu.trove.set.hash.TLongHashSet; @@ -173,6 +175,8 @@ import gnu.trove.set.hash.TLongHashSet; public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { + private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class); + protected static final boolean DEBUG = false; private static final boolean DIAGNOSTICS = false; @@ -289,6 +293,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); + serviceLocator.registerService(EventSupport.class, new EventSupportImpl()); ServiceActivityUpdaterForWriteTransactions.register(this); this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); @@ -417,7 +422,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { if(Development.DEVELOPMENT) { try { @@ -553,7 +558,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -630,14 +635,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { - final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); + final ITask total = ThreadLogger.task("ScheduleDelayedWrite"); assert (request != null); requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); Procedure stateProcedure = new Procedure() { @@ -677,7 +682,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule delayedWriteState = null; - ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); + ITask task2 = ThreadLogger.task("DelayedWriteCommit"); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; @@ -1374,7 +1379,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1472,7 +1477,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1491,19 +1496,22 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); assert (procedure != null); - //int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + // This is never synced but increase to prevent it from visiting 0 + newGraph.asyncBarrier.inc(); + try { if (listener != null) { @@ -1541,8 +1549,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { -// newGraph.state.barrier.inc(); - T t = request.perform(newGraph); try { @@ -1586,9 +1592,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } - -// newGraph.state.barrier.dec(); -// newGraph.waitAsync(request); + + task.finish(); } @@ -1614,10 +1619,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); try { @@ -1625,29 +1632,31 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { try { - QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); - //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true); - //newGraph.processor.query(newGraph, request, null, procedure, listener); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } + QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + LOGGER.error("Unhandled query exception", e); + } } else { -// final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( -// procedure, "request"); + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, null, procedure, request, true) { - BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph.asyncBarrier, newGraph, procedure, request); + public void execute(AsyncReadGraph graph_, T result) { + task.finish(); + super.execute(graph_, result); + } - try { + public void exception(AsyncReadGraph graph_, Throwable t) { + task.finish(); + super.exception(graph_, t); + } - request.perform(newGraph, wrap); - wrap.get(); + }; + try { + wrap.performSync(request); } catch (DatabaseException e) { - - Logger.defaultLogError(e); - + LOGGER.error("Unhandled query exception", e); } } @@ -1676,7 +1685,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1732,7 +1741,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1786,7 +1795,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -2478,7 +2487,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { l.graphChanged(e2); } catch (Throwable ex) { - ex.printStackTrace(); + LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex); } } @@ -2487,7 +2496,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t); } } @@ -3534,8 +3543,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule while(dirtyPrimitives) { dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); + getQueryProvider2().propagateChangesInQueryCache(writer); + getQueryProvider2().listening.fireListeners(writer); } fireMetadataListeners(writer, clientChanges);