X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=030d512eb736e2f6303f0d68c1d0b0e3d2e57b97;hp=d71a8bfc81c491d52dc7afe30cae65a5e8e95d60;hb=90a52d58489b6c518b13dde238923125c8f63eec;hpb=b844ca924d129072e11f7e4c18918dd388a37e8c diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index d71a8bfc8..030d512eb 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * Copyright (c) 2007, 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -39,6 +39,7 @@ import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; import org.simantics.db.MonitorHandler; +import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.ResourceSerializer; import org.simantics.db.Session; @@ -52,10 +53,13 @@ import org.simantics.db.common.Indexing; import org.simantics.db.common.TransactionPolicyRelease; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter; import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure; @@ -75,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.exception.ServiceException; import org.simantics.db.exception.ServiceNotFoundException; +import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.ClusterBase; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterTraitsBase; @@ -88,7 +93,9 @@ import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.graph.WriteSupport; import org.simantics.db.impl.internal.RandomAccessValueSupport; import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; -import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; +import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure; +import org.simantics.db.impl.query.QueryCache; +import org.simantics.db.impl.query.QueryCacheBase; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.impl.query.QueryProcessor.SessionRead; import org.simantics.db.impl.query.QueryProcessor.SessionTask; @@ -136,6 +143,7 @@ import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.DebugSupport; import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.EventSupport; import org.simantics.db.service.GraphChangeListenerSupport; import org.simantics.db.service.InitSupport; import org.simantics.db.service.LifecycleSupport; @@ -159,6 +167,7 @@ import org.simantics.utils.DataContainer; import org.simantics.utils.Development; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; +import org.slf4j.LoggerFactory; import gnu.trove.procedure.TLongProcedure; import gnu.trove.set.hash.TLongHashSet; @@ -166,6 +175,8 @@ import gnu.trove.set.hash.TLongHashSet; public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { + private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class); + protected static final boolean DEBUG = false; private static final boolean DIAGNOSTICS = false; @@ -282,6 +293,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); + serviceLocator.registerService(EventSupport.class, new EventSupportImpl()); ServiceActivityUpdaterForWriteTransactions.register(this); this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); @@ -407,12 +419,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { if(Development.DEVELOPMENT) { try { @@ -466,7 +476,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - assert(!queryProvider2.dirty); + assert(!queryProvider2.cache.dirty); } catch (Throwable e) { @@ -499,7 +509,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule // state.cancelCommit(context, clusterStream); // if (!empty) { // if (!context.isOk()) // this is a blocking operation -// throw new InternalException("Cancel failed. This should never happen. Contact application support."); +// throw new InternalException("Cancel failed. This should never happen."); // getQueryProvider2().performDirtyUpdates(writeState.getGraph()); // } // state.cancelCommit2(context, clusterStream); @@ -545,12 +555,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -627,16 +635,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { - final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); + final ITask total = ThreadLogger.task("ScheduleDelayedWrite"); assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); Procedure stateProcedure = new Procedure() { @@ -676,7 +682,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule delayedWriteState = null; - ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); + ITask task2 = ThreadLogger.task("DelayedWriteCommit"); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; @@ -1370,12 +1376,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1470,12 +1474,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1494,25 +1496,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); assert (procedure != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + // This is never synced but increase to prevent it from visiting 0 + newGraph.asyncBarrier.inc(); + try { if (listener != null) { try { - newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure() { + + AsyncProcedure ap = new AsyncProcedure() { @Override public void exception(AsyncReadGraph graph, Throwable t) { @@ -1530,7 +1536,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule procedure.execute(graph, t); } - }, listener); + }; + + QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true); + } catch (Throwable t) { // This is handled by the AsyncProcedure //Logger.defaultLogError("Internal error", t); @@ -1540,8 +1549,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { -// newGraph.state.barrier.inc(); - T t = request.perform(newGraph); try { @@ -1585,9 +1592,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } - -// newGraph.state.barrier.dec(); -// newGraph.waitAsync(request); + + task.finish(); } @@ -1610,13 +1616,87 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread) { + requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + + try { + + if (listener != null) { + + try { + QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + } else { + + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, null, procedure, request, true) { + + public void execute(AsyncReadGraph graph_, T result) { + task.finish(); + super.execute(graph_, result); + } + + public void exception(AsyncReadGraph graph_, Throwable t) { + task.finish(); + super.exception(graph_, t); + } + + }; + + try { + + request.perform(newGraph, wrap); + wrap.dec(); + wrap.get(); + + } catch (DatabaseException e) { + + Logger.defaultLogError(e); + + } + + } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + + public void scheduleRequest(final MultiRead request, final SyncMultiProcedure procedure, final Semaphore notify) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + int sync = notify != null ? thread : -1; + + requestManager.scheduleRead(new SessionRead(null, notify) { + + @Override + public void run0(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + ListenerBase listener = getListenerBase(procedure); + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); try { @@ -1629,22 +1709,15 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } else { - final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( - procedure, "request"); + final ResultCallWrappedSyncQueryProcedure wrapper = new ResultCallWrappedSyncQueryProcedure(procedure); try { -// newGraph.state.barrier.inc(); - request.perform(newGraph, wrapper); -// newGraph.waitAsync(request); - } catch (Throwable t) { - wrapper.exception(newGraph, t); -// newGraph.waitAsync(request); - + t.printStackTrace(); } @@ -1661,7 +1734,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); } - + public void scheduleRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure, final Semaphore notify) { assert (request != null); @@ -1671,10 +1744,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int sync = notify != null ? thread : -1; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) { + requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1725,10 +1798,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1740,25 +1813,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { - newGraph.processor.query(newGraph, request, null, new Procedure() { - - @Override - public void exception(Throwable t) { - procedure.exception(t); - if(throwable != null) { - throwable.set(t); - } - } - - @Override - public void execute(T t) { - if(result != null) result.set(t); - procedure.execute(t); - } - - }, listener); - -// newGraph.waitAsync(request); + try { + QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } } else { @@ -2434,7 +2493,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { l.graphChanged(e2); } catch (Throwable ex) { - ex.printStackTrace(); + LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex); } } @@ -2443,7 +2502,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t); } } @@ -3048,21 +3107,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule final ArrayList result = new ArrayList(); final DataContainer exception = new DataContainer(); - syncRequest(request, new AsyncMultiProcedure() { + syncRequest(request, new SyncMultiProcedure() { @Override - public void execute(AsyncReadGraph graph, T t) { + public void execute(ReadGraph graph, T t) { synchronized(result) { result.add(t); } } @Override - public void finished(AsyncReadGraph graph) { + public void finished(ReadGraph graph) { } @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { exception.set(t); } @@ -3080,33 +3139,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public Collection syncRequest(MultiRead request, AsyncMultiListener procedure) throws DatabaseException { + public Collection syncRequest(MultiRead arg0, SyncMultiProcedure arg1) throws DatabaseException { assertNotSession(); - return syncRequest(request, (AsyncMultiProcedure)procedure); + throw new Error("Not implemented!"); } @Override public Collection syncRequest(MultiRead request, SyncMultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiListener(procedure)); + return syncRequest(request, (SyncMultiProcedure)procedure); } @Override public Collection syncRequest(MultiRead request, MultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request, SyncMultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public Collection syncRequest(MultiRead request, MultiProcedure procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3141,9 +3194,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + if (t != null) { + if (t instanceof DatabaseException) + throw (DatabaseException) t; + else + throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result; @@ -3262,38 +3317,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert(request != null); - asyncRequest(request, new AsyncMultiProcedureAdapter() { + asyncRequest(request, new SyncMultiProcedureAdapter() { @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { t.printStackTrace(); } }); } - @Override - public void asyncRequest(MultiRead request, AsyncMultiListener procedure) { - asyncRequest(request, (AsyncMultiProcedure)procedure); - } - @Override public void asyncRequest(MultiRead request, SyncMultiListener procedure) { - asyncRequest(request, new SyncToAsyncMultiListener(procedure)); + asyncRequest(request, (SyncMultiProcedure)procedure); } @Override public void asyncRequest(MultiRead request, MultiListener procedure) { - asyncRequest(request, new NoneToAsyncMultiListener(procedure)); + asyncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public void asyncRequest(MultiRead request, SyncMultiProcedure procedure) { - asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + scheduleRequest(request, procedure, null); } @Override public void asyncRequest(MultiRead request, MultiProcedure procedure) { - asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + asyncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3335,17 +3385,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } - @Override - public Collection syncRequest(MultiRead arg0, AsyncMultiProcedure arg1) throws DatabaseException { - assertNotSession(); - throw new Error("Not implemented!"); - } - - @Override - public void asyncRequest(MultiRead arg0, AsyncMultiProcedure arg1) { - throw new Error("Not implemented!"); - } - @Override final public void asyncRequest(final ExternalRead request) { @@ -3365,28 +3404,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, (Procedure)procedure); } - - - void check(Throwable t) throws DatabaseException { - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - void check(DataContainer container) throws DatabaseException { - Throwable t = container.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - - - - - boolean sameProvider(Write request) { if(writeState.getGraph().provider != null) { return writeState.getGraph().provider.equals(request.getProvider()); @@ -3403,9 +3420,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group"); + private void assertNotSession() throws DatabaseException { Thread current = Thread.currentThread(); - if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction."); + if (sessionThreads.contains(current)) + throw new ServiceException("Caller is already inside a transaction."); } void assertAlive() { @@ -3530,8 +3549,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule while(dirtyPrimitives) { dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); + getQueryProvider2().propagateChangesInQueryCache(writer); + getQueryProvider2().listening.fireListeners(writer); } fireMetadataListeners(writer, clientChanges);