X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=6d34d9341c425bbdd465616af54a12d6e3e62cf4;hb=6326b65c5c6a785aadcb615cb95ceadd8f53964a;hp=8bbccd080512af308765c0a15c1a413526c5acf1;hpb=1e957fc9da518f3bef8a2c19cad72772087e1b6a;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 8bbccd080..6d34d9341 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -39,6 +39,7 @@ import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; import org.simantics.db.MonitorHandler; +import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.ResourceSerializer; import org.simantics.db.Session; @@ -52,10 +53,13 @@ import org.simantics.db.common.Indexing; import org.simantics.db.common.TransactionPolicyRelease; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter; import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure; @@ -75,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.exception.ServiceException; import org.simantics.db.exception.ServiceNotFoundException; +import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.ClusterBase; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterTraitsBase; @@ -88,8 +93,9 @@ import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.graph.WriteSupport; import org.simantics.db.impl.internal.RandomAccessValueSupport; import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; -import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; +import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure; import org.simantics.db.impl.query.QueryCache; +import org.simantics.db.impl.query.QueryCacheBase; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.impl.query.QueryProcessor.SessionRead; import org.simantics.db.impl.query.QueryProcessor.SessionTask; @@ -410,7 +416,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -548,7 +554,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -634,7 +640,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -1373,7 +1379,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -1473,7 +1479,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(true) { @Override public void run(int thread) { @@ -1495,9 +1501,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); assert (procedure != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; + //int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override public void run(int thread) { @@ -1534,7 +1540,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }; - QueryCache.runnerReadEntry(newGraph, request, null, listener, ap); + QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true); } catch (Throwable t) { // This is handled by the AsyncProcedure @@ -1615,7 +1621,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread) { + requestManager.scheduleRead(new SessionRead(null, notify) { @Override public void run(int thread) { @@ -1629,7 +1635,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { try { - QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure); + QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); + //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true); //newGraph.processor.query(newGraph, request, null, procedure, listener); } catch (DatabaseException e) { Logger.defaultLogError(e); @@ -1637,20 +1644,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } else { - final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( - procedure, "request"); - - try { +// final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( +// procedure, "request"); -// newGraph.state.barrier.inc(); + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, procedure, request); - request.perform(newGraph, wrapper); + try { -// newGraph.waitAsync(request); + request.perform(newGraph, wrap); + wrap.get(); } catch (Throwable t) { - wrapper.exception(newGraph, t); + wrap.exception(newGraph, t); + +// wrapper.exception(newGraph, t); // newGraph.waitAsync(request); @@ -1670,6 +1678,62 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } + public void scheduleRequest(final MultiRead request, final SyncMultiProcedure procedure, final Semaphore notify) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + int sync = notify != null ? thread : -1; + + requestManager.scheduleRead(new SessionRead(null, notify) { + + @Override + public void run(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + ListenerBase listener = getListenerBase(procedure); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + + try { + + if (listener != null) { + + newGraph.processor.query(newGraph, request, null, procedure, listener); + +// newGraph.waitAsync(request); + + } else { + + final ResultCallWrappedSyncQueryProcedure wrapper = new ResultCallWrappedSyncQueryProcedure(procedure); + + try { + + request.perform(newGraph, wrapper); + + } catch (Throwable t) { + + t.printStackTrace(); + + } + + } + + } finally { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + public void scheduleRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure, final Semaphore notify) { assert (request != null); @@ -1679,7 +1743,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int sync = notify != null ? thread : -1; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) { + requestManager.scheduleRead(new SessionRead(null, notify) { @Override public void run(int thread) { @@ -1733,7 +1797,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override public void run(int thread) { @@ -1748,23 +1812,30 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { - newGraph.processor.query(newGraph, request, null, new Procedure() { - - @Override - public void exception(Throwable t) { - procedure.exception(t); - if(throwable != null) { - throwable.set(t); - } - } - - @Override - public void execute(T t) { - if(result != null) result.set(t); - procedure.execute(t); - } - - }, listener); + try { + QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + +// newGraph.processor.query(newGraph, request, null, new Procedure() { +// +// @Override +// public void exception(Throwable t) { +// procedure.exception(t); +// if(throwable != null) { +// throwable.set(t); +// } +// } +// +// @Override +// public void execute(T t) { +// if(result != null) result.set(t); +// procedure.execute(t); +// } +// +// }, listener); // newGraph.waitAsync(request); @@ -3056,21 +3127,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule final ArrayList result = new ArrayList(); final DataContainer exception = new DataContainer(); - syncRequest(request, new AsyncMultiProcedure() { + syncRequest(request, new SyncMultiProcedure() { @Override - public void execute(AsyncReadGraph graph, T t) { + public void execute(ReadGraph graph, T t) { synchronized(result) { result.add(t); } } @Override - public void finished(AsyncReadGraph graph) { + public void finished(ReadGraph graph) { } @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { exception.set(t); } @@ -3088,33 +3159,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public Collection syncRequest(MultiRead request, AsyncMultiListener procedure) throws DatabaseException { + public Collection syncRequest(MultiRead arg0, SyncMultiProcedure arg1) throws DatabaseException { assertNotSession(); - return syncRequest(request, (AsyncMultiProcedure)procedure); + throw new Error("Not implemented!"); } @Override public Collection syncRequest(MultiRead request, SyncMultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiListener(procedure)); + return syncRequest(request, (SyncMultiProcedure)procedure); } @Override public Collection syncRequest(MultiRead request, MultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request, SyncMultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public Collection syncRequest(MultiRead request, MultiProcedure procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3270,38 +3335,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert(request != null); - asyncRequest(request, new AsyncMultiProcedureAdapter() { + asyncRequest(request, new SyncMultiProcedureAdapter() { @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { t.printStackTrace(); } }); } - @Override - public void asyncRequest(MultiRead request, AsyncMultiListener procedure) { - asyncRequest(request, (AsyncMultiProcedure)procedure); - } - @Override public void asyncRequest(MultiRead request, SyncMultiListener procedure) { - asyncRequest(request, new SyncToAsyncMultiListener(procedure)); + asyncRequest(request, (SyncMultiProcedure)procedure); } @Override public void asyncRequest(MultiRead request, MultiListener procedure) { - asyncRequest(request, new NoneToAsyncMultiListener(procedure)); + asyncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public void asyncRequest(MultiRead request, SyncMultiProcedure procedure) { - asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + scheduleRequest(request, procedure, null); } @Override public void asyncRequest(MultiRead request, MultiProcedure procedure) { - asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + asyncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3343,17 +3403,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } - @Override - public Collection syncRequest(MultiRead arg0, AsyncMultiProcedure arg1) throws DatabaseException { - assertNotSession(); - throw new Error("Not implemented!"); - } - - @Override - public void asyncRequest(MultiRead arg0, AsyncMultiProcedure arg1) { - throw new Error("Not implemented!"); - } - @Override final public void asyncRequest(final ExternalRead request) { @@ -3513,7 +3562,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule public int getAmountOfQueryThreads() { // This must be a power of two - return 4; + return 1; // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors()); }