X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=b98b4dec643383d163cd05746427db59b6c523ea;hb=0db1af75cd14f99f16c42c920562708abf37be93;hp=30e0dde5244de43dbd11ff9582015dad0d831f76;hpb=4aba159170fc72d39c2f930ea224aa71f4cdc2e7;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 30e0dde52..b98b4dec6 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -143,6 +143,7 @@ import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.DebugSupport; import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.EventSupport; import org.simantics.db.service.GraphChangeListenerSupport; import org.simantics.db.service.InitSupport; import org.simantics.db.service.LifecycleSupport; @@ -166,6 +167,7 @@ import org.simantics.utils.DataContainer; import org.simantics.utils.Development; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; +import org.slf4j.LoggerFactory; import gnu.trove.procedure.TLongProcedure; import gnu.trove.set.hash.TLongHashSet; @@ -173,6 +175,8 @@ import gnu.trove.set.hash.TLongHashSet; public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { + private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class); + protected static final boolean DEBUG = false; private static final boolean DIAGNOSTICS = false; @@ -289,6 +293,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); + serviceLocator.registerService(EventSupport.class, new EventSupportImpl()); ServiceActivityUpdaterForWriteTransactions.register(this); this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); @@ -414,12 +419,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { if(Development.DEVELOPMENT) { try { @@ -552,12 +555,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -638,12 +639,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); Procedure stateProcedure = new Procedure() { @@ -1377,12 +1376,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1477,12 +1474,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(true) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1506,7 +1501,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1514,6 +1509,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + // This is never synced but increase to prevent it from visiting 0 + newGraph.asyncBarrier.inc(); + try { if (listener != null) { @@ -1624,7 +1622,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1647,7 +1645,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule // final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( // procedure, "request"); - BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, procedure, request); + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph.asyncBarrier, newGraph, procedure, request); try { @@ -1686,7 +1684,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1742,7 +1740,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1796,7 +1794,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -2488,7 +2486,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { l.graphChanged(e2); } catch (Throwable ex) { - ex.printStackTrace(); + LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex); } } @@ -2497,7 +2495,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t); } } @@ -3189,9 +3187,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + if (t != null) { + if (t instanceof DatabaseException) + throw (DatabaseException) t; + else + throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result; @@ -3397,28 +3397,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, (Procedure)procedure); } - - - void check(Throwable t) throws DatabaseException { - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - void check(DataContainer container) throws DatabaseException { - Throwable t = container.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - - - - - boolean sameProvider(Write request) { if(writeState.getGraph().provider != null) { return writeState.getGraph().provider.equals(request.getProvider()); @@ -3435,9 +3413,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group"); + private void assertNotSession() throws DatabaseException { Thread current = Thread.currentThread(); - if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction."); + if (sessionThreads.contains(current)) + throw new ServiceException("Caller is already inside a transaction."); } void assertAlive() { @@ -3562,8 +3542,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule while(dirtyPrimitives) { dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); + getQueryProvider2().propagateChangesInQueryCache(writer); + getQueryProvider2().listening.fireListeners(writer); } fireMetadataListeners(writer, clientChanges);