X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=6d34d9341c425bbdd465616af54a12d6e3e62cf4;hb=6326b65c5c6a785aadcb615cb95ceadd8f53964a;hp=08d740f267d55daef3c38a8f205b0a75f7919154;hpb=5bc1c2b950d374ecfcc206981724c3b9286dcbd6;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 08d740f26..6d34d9341 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -39,6 +39,7 @@ import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; import org.simantics.db.MonitorHandler; +import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.ResourceSerializer; import org.simantics.db.Session; @@ -50,13 +51,15 @@ import org.simantics.db.authentication.UserAuthenticationAgent; import org.simantics.db.authentication.UserAuthenticator; import org.simantics.db.common.Indexing; import org.simantics.db.common.TransactionPolicyRelease; -import org.simantics.db.common.procedure.BlockingAsyncProcedure; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter; import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure; @@ -76,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.exception.ServiceException; import org.simantics.db.exception.ServiceNotFoundException; +import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.ClusterBase; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterTraitsBase; @@ -89,7 +93,7 @@ import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.graph.WriteSupport; import org.simantics.db.impl.internal.RandomAccessValueSupport; import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; -import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; +import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure; import org.simantics.db.impl.query.QueryCache; import org.simantics.db.impl.query.QueryCacheBase; import org.simantics.db.impl.query.QueryProcessor; @@ -412,7 +416,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -550,7 +554,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -636,7 +640,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -1375,7 +1379,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -1475,7 +1479,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -1499,7 +1503,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule //int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, throwable, notify, queryProvider2.THREAD_MASK + 1, -1) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override public void run(int thread) { @@ -1617,7 +1621,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread) { + requestManager.scheduleRead(new SessionRead(null, notify) { @Override public void run(int thread) { @@ -1643,7 +1647,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule // final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( // procedure, "request"); - BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(procedure, request); + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, procedure, request); try { @@ -1674,6 +1678,62 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } + public void scheduleRequest(final MultiRead request, final SyncMultiProcedure procedure, final Semaphore notify) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + int sync = notify != null ? thread : -1; + + requestManager.scheduleRead(new SessionRead(null, notify) { + + @Override + public void run(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + ListenerBase listener = getListenerBase(procedure); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + + try { + + if (listener != null) { + + newGraph.processor.query(newGraph, request, null, procedure, listener); + +// newGraph.waitAsync(request); + + } else { + + final ResultCallWrappedSyncQueryProcedure wrapper = new ResultCallWrappedSyncQueryProcedure(procedure); + + try { + + request.perform(newGraph, wrapper); + + } catch (Throwable t) { + + t.printStackTrace(); + + } + + } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + public void scheduleRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure, final Semaphore notify) { assert (request != null); @@ -1683,7 +1743,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int sync = notify != null ? thread : -1; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) { + requestManager.scheduleRead(new SessionRead(null, notify) { @Override public void run(int thread) { @@ -1737,7 +1797,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override public void run(int thread) { @@ -3067,21 +3127,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule final ArrayList result = new ArrayList(); final DataContainer exception = new DataContainer(); - syncRequest(request, new AsyncMultiProcedure() { + syncRequest(request, new SyncMultiProcedure() { @Override - public void execute(AsyncReadGraph graph, T t) { + public void execute(ReadGraph graph, T t) { synchronized(result) { result.add(t); } } @Override - public void finished(AsyncReadGraph graph) { + public void finished(ReadGraph graph) { } @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { exception.set(t); } @@ -3099,33 +3159,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public Collection syncRequest(MultiRead request, AsyncMultiListener procedure) throws DatabaseException { + public Collection syncRequest(MultiRead arg0, SyncMultiProcedure arg1) throws DatabaseException { assertNotSession(); - return syncRequest(request, (AsyncMultiProcedure)procedure); + throw new Error("Not implemented!"); } @Override public Collection syncRequest(MultiRead request, SyncMultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiListener(procedure)); + return syncRequest(request, (SyncMultiProcedure)procedure); } @Override public Collection syncRequest(MultiRead request, MultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request, SyncMultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public Collection syncRequest(MultiRead request, MultiProcedure procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3281,38 +3335,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert(request != null); - asyncRequest(request, new AsyncMultiProcedureAdapter() { + asyncRequest(request, new SyncMultiProcedureAdapter() { @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { t.printStackTrace(); } }); } - @Override - public void asyncRequest(MultiRead request, AsyncMultiListener procedure) { - asyncRequest(request, (AsyncMultiProcedure)procedure); - } - @Override public void asyncRequest(MultiRead request, SyncMultiListener procedure) { - asyncRequest(request, new SyncToAsyncMultiListener(procedure)); + asyncRequest(request, (SyncMultiProcedure)procedure); } @Override public void asyncRequest(MultiRead request, MultiListener procedure) { - asyncRequest(request, new NoneToAsyncMultiListener(procedure)); + asyncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public void asyncRequest(MultiRead request, SyncMultiProcedure procedure) { - asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + scheduleRequest(request, procedure, null); } @Override public void asyncRequest(MultiRead request, MultiProcedure procedure) { - asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + asyncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3354,17 +3403,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } - @Override - public Collection syncRequest(MultiRead arg0, AsyncMultiProcedure arg1) throws DatabaseException { - assertNotSession(); - throw new Error("Not implemented!"); - } - - @Override - public void asyncRequest(MultiRead arg0, AsyncMultiProcedure arg1) { - throw new Error("Not implemented!"); - } - @Override final public void asyncRequest(final ExternalRead request) { @@ -3524,7 +3562,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule public int getAmountOfQueryThreads() { // This must be a power of two - return 16; + return 1; // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors()); }