X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=f8fd7d17ddab55e7274388ff259155918227486f;hb=209d3c557147c8c2c8ae4d8500d1b9a8c5c3166d;hp=d32c066bcdfc9bee732cd6ecf3829b5b0eb7a9a1;hpb=090aaaa7349a99f59b70cd1fbcdd08c270aca2a2;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index d32c066bc..f8fd7d17d 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -461,7 +461,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); assert (null != writer); -// writer.state.barrier.inc(); + writer.asyncBarrier.inc(); try { request.perform(writer); @@ -471,7 +471,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t); writeState.except(t); } finally { -// writer.state.barrier.dec(); + writer.asyncBarrier.dec(); // writer.waitAsync(request); } @@ -571,52 +571,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule VirtualGraph vg = getProvider(request.getProvider()); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); - try { - WriteState writeStateT = new WriteState(writer, request, notify, procedure); - writeState = writeStateT; + WriteState writeStateT = new WriteState(writer, request, notify, procedure); + writeState = writeStateT; + assert (null != writer); - assert (null != writer); -// writer.state.barrier.inc(); + try { + writer.asyncBarrier.inc(); writeStateT.setResult(request.perform(writer)); assert (null != writer); - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - } catch (Throwable e) { - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - writeState.except(e); - -// state.stopWriteTransaction(clusterStream); -// -// } catch (Throwable e) { -// // Log it first, just to be safe that the error is always logged. -// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); -// -// try { -// // Callback is client code, we have to be prepared for it to throw unexpected exceptions. -// // All we can do here is to log those, can't really pass them anywhere. -// if (procedure != null) { -// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e); -// else procedure.exception(new DatabaseException(e)); -// } -// } catch (Throwable e2) { -// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); -// } -// -// clientChanges = new ClientChangesImpl(SessionImplSocket.this); -// -// state.stopWriteTransaction(clusterStream); - } finally { + writer.asyncBarrier.dec(); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); } -// if(notify != null) notify.release(); - task.finish(); } @@ -635,7 +604,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { - final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); + final ITask total = ThreadLogger.task("ScheduleDelayedWrite"); assert (request != null); @@ -682,7 +651,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule delayedWriteState = null; - ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); + ITask task2 = ThreadLogger.task("DelayedWriteCommit"); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; @@ -1496,8 +1465,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); assert (procedure != null); - //int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override @@ -1505,6 +1472,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); @@ -1549,8 +1518,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { -// newGraph.state.barrier.inc(); - T t = request.perform(newGraph); try { @@ -1594,9 +1561,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } - -// newGraph.state.barrier.dec(); -// newGraph.waitAsync(request); + + task.finish(); } @@ -1626,42 +1592,49 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + newGraph.asyncBarrier.inc(); try { if (listener != null) { try { - QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); - //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true); - //newGraph.processor.query(newGraph, request, null, procedure, listener); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } + QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + LOGGER.error("Unhandled query exception", e); + } } else { -// final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( -// procedure, "request"); + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, null, procedure, request, true) { - BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph.asyncBarrier, newGraph, procedure, request); + public void execute(AsyncReadGraph graph_, T result) { + task.finish(); + super.execute(graph_, result); + } - try { + public void exception(AsyncReadGraph graph_, Throwable t) { + task.finish(); + super.exception(graph_, t); + } - request.perform(newGraph, wrap); - wrap.get(); + }; + try { + wrap.performSync(request); } catch (DatabaseException e) { - - Logger.defaultLogError(e); - + LOGGER.error("Unhandled query exception", e); } } } finally { + newGraph.asyncBarrier.dec(); + fireSessionVariableChange(SessionVariables.QUEUED_READS); } @@ -2478,6 +2451,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return; WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null); + reactionGraph.asyncBarrier.inc(); try { @@ -2492,6 +2466,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } finally { + reactionGraph.asyncBarrier.dec(); + } } catch (Throwable t) { @@ -3542,8 +3518,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule while(dirtyPrimitives) { dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); + getQueryProvider2().propagateChangesInQueryCache(writer); + getQueryProvider2().listening.fireListeners(writer); } fireMetadataListeners(writer, clientChanges);