X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=242fcfe3de8838867d4f7a8fe59ccdaa95eaa0ea;hb=46c8189fee24a71d1ce0634e12db5a696e952b10;hp=80dc3b9bbbbea42e31c1ee3fdc09f613e4753ec1;hpb=62201fffbf97905cd7cce3b68458f519a64a26d3;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 80dc3b9bb..242fcfe3d 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -27,12 +27,14 @@ import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ChangeSet; import org.simantics.db.DevelopmentKeys; +import org.simantics.db.Disposable; import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; @@ -87,6 +89,8 @@ import org.simantics.db.impl.graph.WriteSupport; import org.simantics.db.impl.internal.RandomAccessValueSupport; import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; +import org.simantics.db.impl.query.QueryCache; +import org.simantics.db.impl.query.QueryCacheBase; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.impl.query.QueryProcessor.SessionRead; import org.simantics.db.impl.query.QueryProcessor.SessionTask; @@ -155,7 +159,6 @@ import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; -import org.simantics.utils.datastructures.Callback; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; @@ -324,31 +327,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private Object result; final Semaphore sema = new Semaphore(0); private Throwable throwable = null; - final Callback callback = new Callback() { - @Override - public void run(DatabaseException e) { - synchronized (TaskHelper.this) { - throwable = e; - } + final Consumer callback = e -> { + synchronized (TaskHelper.this) { + throwable = e; } }; final Procedure proc = new Procedure() { @Override public void execute(Object result) { - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (t instanceof DatabaseException) - callback.run((DatabaseException)t); + callback.accept((DatabaseException)t); else - callback.run(new DatabaseException("" + name + "operation failed.", t)); + callback.accept(new DatabaseException("" + name + "operation failed.", t)); } }; final WriteTraits writeTraits = new WriteTraits() {}; TaskHelper(String name) { this.name = name; } + @SuppressWarnings("unchecked") T getResult() { return (T)result; } @@ -387,7 +388,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return null; } - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -395,7 +396,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify, Boolean combine) { assert (request != null); @@ -431,6 +432,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); @@ -440,12 +442,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); @@ -466,7 +468,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - assert(!queryProvider2.dirty); + assert(!queryProvider2.cache.dirty); } catch (Throwable e) { @@ -557,6 +559,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); @@ -616,7 +619,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -624,7 +627,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); @@ -642,13 +645,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { if (callback != null) - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (callback != null) { - if (t instanceof DatabaseException) callback.run((DatabaseException) t); - else callback.run(new DatabaseException(t)); + if (t instanceof DatabaseException) callback.accept((DatabaseException) t); + else callback.accept(new DatabaseException(t)); } else Logger.defaultLogError("Unhandled exception", t); } @@ -665,6 +668,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } catch (Throwable e) { delayedWriteState.except(e); total.finish(); + dwg.close(); return; } finally { // newGraph.state.barrier.dec(); @@ -678,6 +682,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -848,14 +853,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private void maintainCluster(ClusterImpl before, ClusterI after_) { if(after_ != null && after_ != before) { ClusterImpl after = (ClusterImpl)after_; - if(currentCluster == before) currentCluster = after; + if(currentCluster == before) { + currentCluster = after; + } clusterTable.replaceCluster(after); } } public int createResourceKey(int foreignCounter) throws DatabaseException { - if(currentCluster == null) + if(currentCluster == null) { currentCluster = getNewResourceCluster(); + } if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) { ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster(); newCluster.foreignLookup = new byte[foreignCounter]; @@ -1289,11 +1297,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { - int thread = request.hashCode() & queryProvider2.THREAD_MASK; + //int thread = request.hashCode() & queryProvider2.THREAD_MASK; fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1354,12 +1363,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @Override - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify, Boolean combine) { assertAlive(); @@ -1379,6 +1388,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1392,12 +1402,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); @@ -1420,7 +1430,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(!(e instanceof CancelTransactionException)) { if (callback != null) - callback.run(new DatabaseException(e)); + callback.accept(new DatabaseException(e)); } writeState.except(e); @@ -1506,7 +1516,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { try { - newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure() { + + AsyncProcedure ap = new AsyncProcedure() { @Override public void exception(AsyncReadGraph graph, Throwable t) { @@ -1524,7 +1535,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule procedure.execute(graph, t); } - }, listener); + }; + + QueryCache.runnerReadEntry(newGraph, request, null, listener, ap); + } catch (Throwable t) { // This is handled by the AsyncProcedure //Logger.defaultLogError("Internal error", t); @@ -1617,9 +1631,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { - newGraph.processor.query(newGraph, request, null, procedure, listener); - -// newGraph.waitAsync(request); + try { + QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure); + //newGraph.processor.query(newGraph, request, null, procedure, listener); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } } else { @@ -1734,23 +1751,30 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { - newGraph.processor.query(newGraph, request, null, new Procedure() { - - @Override - public void exception(Throwable t) { - procedure.exception(t); - if(throwable != null) { - throwable.set(t); - } - } - - @Override - public void execute(T t) { - if(result != null) result.set(t); - procedure.execute(t); - } - - }, listener); + try { + QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } + + +// newGraph.processor.query(newGraph, request, null, new Procedure() { +// +// @Override +// public void exception(Throwable t) { +// procedure.exception(t); +// if(throwable != null) { +// throwable.set(t); +// } +// } +// +// @Override +// public void execute(T t) { +// if(result != null) result.set(t); +// procedure.execute(t); +// } +// +// }, listener); // newGraph.waitAsync(request); @@ -1959,14 +1983,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - - @Override - public void run(DatabaseException e) { - exception.set(e); - } - - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2060,12 +2077,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2076,12 +2088,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2113,7 +2120,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final Write request, final Callback callback) { + public void asyncRequest(final Write request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2141,7 +2148,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final DelayedWrite request, final Callback callback) { + public void asyncRequest(final DelayedWrite request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2158,7 +2165,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final WriteOnly request, final Callback callback) { + public void asyncRequest(final WriteOnly request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2282,16 +2289,16 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return true; } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2305,24 +2312,24 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2336,11 +2343,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); } @@ -3516,7 +3523,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule public int getAmountOfQueryThreads() { // This must be a power of two - return 1; + return 4; // return Integer.highestOneBit(Runtime.getRuntime().availableProcessors()); }