X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=d71a8bfc81c491d52dc7afe30cae65a5e8e95d60;hp=c286f02c7d4434e69fe31fc0978ef057e16764d8;hb=b844ca924d129072e11f7e4c18918dd388a37e8c;hpb=edbb4df64407826271ee6423451401684e9cd68c diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index c286f02c7..d71a8bfc8 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -27,12 +27,14 @@ import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ChangeSet; import org.simantics.db.DevelopmentKeys; +import org.simantics.db.Disposable; import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; @@ -155,7 +157,6 @@ import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; -import org.simantics.utils.datastructures.Callback; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; @@ -324,31 +325,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private Object result; final Semaphore sema = new Semaphore(0); private Throwable throwable = null; - final Callback callback = new Callback() { - @Override - public void run(DatabaseException e) { - synchronized (TaskHelper.this) { - throwable = e; - } + final Consumer callback = e -> { + synchronized (TaskHelper.this) { + throwable = e; } }; final Procedure proc = new Procedure() { @Override public void execute(Object result) { - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (t instanceof DatabaseException) - callback.run((DatabaseException)t); + callback.accept((DatabaseException)t); else - callback.run(new DatabaseException("" + name + "operation failed.", t)); + callback.accept(new DatabaseException("" + name + "operation failed.", t)); } }; final WriteTraits writeTraits = new WriteTraits() {}; TaskHelper(String name) { this.name = name; } + @SuppressWarnings("unchecked") T getResult() { return (T)result; } @@ -387,7 +386,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return null; } - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -395,7 +394,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify, Boolean combine) { assert (request != null); @@ -431,6 +430,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); @@ -440,12 +440,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); @@ -557,6 +557,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); @@ -616,7 +617,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -624,7 +625,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); @@ -642,13 +643,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { if (callback != null) - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (callback != null) { - if (t instanceof DatabaseException) callback.run((DatabaseException) t); - else callback.run(new DatabaseException(t)); + if (t instanceof DatabaseException) callback.accept((DatabaseException) t); + else callback.accept(new DatabaseException(t)); } else Logger.defaultLogError("Unhandled exception", t); } @@ -679,6 +680,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -849,14 +851,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private void maintainCluster(ClusterImpl before, ClusterI after_) { if(after_ != null && after_ != before) { ClusterImpl after = (ClusterImpl)after_; - if(currentCluster == before) currentCluster = after; + if(currentCluster == before) { + currentCluster = after; + } clusterTable.replaceCluster(after); } } public int createResourceKey(int foreignCounter) throws DatabaseException { - if(currentCluster == null) + if(currentCluster == null) { currentCluster = getNewResourceCluster(); + } if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) { ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster(); newCluster.foreignLookup = new byte[foreignCounter]; @@ -1017,15 +1022,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s); - int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p); - int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o); ClusterI pc = clusterTable.getClusterProxyByResourceKey(p); ClusterI oc = clusterTable.getClusterProxyByResourceKey(o); clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION); - clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION); - clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION); + clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION); + clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION); clusterTranslator.removeStatement(cluster); queryProvider2.invalidateResource(s); @@ -1290,11 +1293,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { - int thread = request.hashCode() & queryProvider2.THREAD_MASK; + //int thread = request.hashCode() & queryProvider2.THREAD_MASK; fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1355,12 +1359,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @Override - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify, Boolean combine) { assertAlive(); @@ -1380,6 +1384,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1393,12 +1398,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); @@ -1421,7 +1426,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(!(e instanceof CancelTransactionException)) { if (callback != null) - callback.run(new DatabaseException(e)); + callback.accept(new DatabaseException(e)); } writeState.except(e); @@ -1960,14 +1965,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - - @Override - public void run(DatabaseException e) { - exception.set(e); - } - - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2061,12 +2059,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2077,12 +2070,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2114,7 +2102,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final Write request, final Callback callback) { + public void asyncRequest(final Write request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2142,7 +2130,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final DelayedWrite request, final Callback callback) { + public void asyncRequest(final DelayedWrite request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2159,7 +2147,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final WriteOnly request, final Callback callback) { + public void asyncRequest(final WriteOnly request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2283,16 +2271,16 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return true; } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2306,24 +2294,24 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2337,11 +2325,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); }