X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=9a86dc1fa2ae813b0b3a137b51a9063546af27af;hp=6c3497f1569b4040ae10200e8a78d1a913bc7cdc;hb=60887e497000cd31f5e5f2008d681b8612a21d26;hpb=0ae2b770234dfc3cbb18bd38f324125cf0faca07 diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index 6c3497f15..9a86dc1fa 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -27,6 +27,7 @@ import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; @@ -118,7 +119,6 @@ import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; import org.simantics.db.request.Read; import org.simantics.db.request.ReadInterface; -import org.simantics.db.request.UndoTraits; import org.simantics.db.request.Write; import org.simantics.db.request.WriteInterface; import org.simantics.db.request.WriteOnly; @@ -156,7 +156,6 @@ import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; -import org.simantics.utils.datastructures.Callback; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; @@ -325,40 +324,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private Object result; final Semaphore sema = new Semaphore(0); private Throwable throwable = null; - final Callback callback = new Callback() { - @Override - public void run(DatabaseException e) { - synchronized (TaskHelper.this) { - throwable = e; - } + final Consumer callback = e -> { + synchronized (TaskHelper.this) { + throwable = e; } }; final Procedure proc = new Procedure() { @Override public void execute(Object result) { - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (t instanceof DatabaseException) - callback.run((DatabaseException)t); + callback.accept((DatabaseException)t); else - callback.run(new DatabaseException("" + name + "operation failed.", t)); - } - }; - final WriteTraits writeTraits = new WriteTraits() { - @Override - public UndoTraits getUndoTraits() { - return null; - } - @Override - public VirtualGraph getProvider() { - return null; + callback.accept(new DatabaseException("" + name + "operation failed.", t)); } }; + final WriteTraits writeTraits = new WriteTraits() {}; TaskHelper(String name) { this.name = name; } + @SuppressWarnings("unchecked") T getResult() { return (T)result; } @@ -397,7 +385,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return null; } - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -405,7 +393,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify, Boolean combine) { assert (request != null); @@ -450,12 +438,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); @@ -626,7 +614,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -634,7 +622,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); @@ -652,13 +640,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { if (callback != null) - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (callback != null) { - if (t instanceof DatabaseException) callback.run((DatabaseException) t); - else callback.run(new DatabaseException(t)); + if (t instanceof DatabaseException) callback.accept((DatabaseException) t); + else callback.accept(new DatabaseException(t)); } else Logger.defaultLogError("Unhandled exception", t); } @@ -675,6 +663,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } catch (Throwable e) { delayedWriteState.except(e); total.finish(); + dwg.close(); return; } finally { // newGraph.state.barrier.dec(); @@ -858,14 +847,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private void maintainCluster(ClusterImpl before, ClusterI after_) { if(after_ != null && after_ != before) { ClusterImpl after = (ClusterImpl)after_; - if(currentCluster == before) currentCluster = after; + if(currentCluster == before) { + currentCluster = after; + } clusterTable.replaceCluster(after); } } public int createResourceKey(int foreignCounter) throws DatabaseException { - if(currentCluster == null) + if(currentCluster == null) { currentCluster = getNewResourceCluster(); + } if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) { ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster(); newCluster.foreignLookup = new byte[foreignCounter]; @@ -1299,7 +1291,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { - int thread = request.hashCode() & queryProvider2.THREAD_MASK; + //int thread = request.hashCode() & queryProvider2.THREAD_MASK; fireSessionVariableChange(SessionVariables.QUEUED_WRITES); @@ -1364,12 +1356,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @Override - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify, Boolean combine) { assertAlive(); @@ -1402,12 +1394,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); @@ -1430,7 +1422,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(!(e instanceof CancelTransactionException)) { if (callback != null) - callback.run(new DatabaseException(e)); + callback.accept(new DatabaseException(e)); } writeState.except(e); @@ -1969,14 +1961,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - - @Override - public void run(DatabaseException e) { - exception.set(e); - } - - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2070,12 +2055,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2086,12 +2066,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2123,7 +2098,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final Write request, final Callback callback) { + public void asyncRequest(final Write request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2151,7 +2126,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final DelayedWrite request, final Callback callback) { + public void asyncRequest(final DelayedWrite request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2168,7 +2143,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final WriteOnly request, final Callback callback) { + public void asyncRequest(final WriteOnly request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2292,16 +2267,16 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return true; } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2315,24 +2290,24 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2346,11 +2321,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); }