X-Git-Url: https://gerrit.simantics.org/r/gitweb?p=simantics%2Fplatform.git;a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=29c70445bd208dd190c9796b60f2e31f46408dca;hp=c286f02c7d4434e69fe31fc0978ef057e16764d8;hb=HEAD;hpb=edbb4df64407826271ee6423451401684e9cd68c diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index c286f02c7..29c70445b 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -1,5 +1,5 @@ /******************************************************************************* - * Copyright (c) 2007, 2010 Association for Decentralized Information Management + * Copyright (c) 2007, 2018 Association for Decentralized Information Management * in Industry THTH ry. * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v1.0 @@ -27,16 +27,19 @@ import java.util.TreeMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.eclipse.core.runtime.Platform; import org.simantics.databoard.Bindings; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ChangeSet; import org.simantics.db.DevelopmentKeys; +import org.simantics.db.Disposable; import org.simantics.db.ExternalValueSupport; import org.simantics.db.Metadata; import org.simantics.db.MonitorContext; import org.simantics.db.MonitorHandler; +import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.ResourceSerializer; import org.simantics.db.Session; @@ -50,10 +53,13 @@ import org.simantics.db.common.Indexing; import org.simantics.db.common.TransactionPolicyRelease; import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter; import org.simantics.db.common.procedure.adapter.ProcedureAdapter; +import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter; import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener; +import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure; import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener; import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure; @@ -73,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.exception.RuntimeDatabaseException; import org.simantics.db.exception.ServiceException; import org.simantics.db.exception.ServiceNotFoundException; +import org.simantics.db.impl.BlockingAsyncProcedure; import org.simantics.db.impl.ClusterBase; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterTraitsBase; @@ -86,7 +93,9 @@ import org.simantics.db.impl.graph.WriteGraphImpl; import org.simantics.db.impl.graph.WriteSupport; import org.simantics.db.impl.internal.RandomAccessValueSupport; import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4; -import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4; +import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure; +import org.simantics.db.impl.query.QueryCache; +import org.simantics.db.impl.query.QueryCacheBase; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.impl.query.QueryProcessor.SessionRead; import org.simantics.db.impl.query.QueryProcessor.SessionTask; @@ -134,6 +143,7 @@ import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.DebugSupport; import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.EventSupport; import org.simantics.db.service.GraphChangeListenerSupport; import org.simantics.db.service.InitSupport; import org.simantics.db.service.LifecycleSupport; @@ -155,9 +165,9 @@ import org.simantics.db.service.XSupport; import org.simantics.layer0.Layer0; import org.simantics.utils.DataContainer; import org.simantics.utils.Development; -import org.simantics.utils.datastructures.Callback; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; +import org.slf4j.LoggerFactory; import gnu.trove.procedure.TLongProcedure; import gnu.trove.set.hash.TLongHashSet; @@ -165,6 +175,8 @@ import gnu.trove.set.hash.TLongHashSet; public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { + private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class); + protected static final boolean DEBUG = false; private static final boolean DIAGNOSTICS = false; @@ -281,6 +293,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); + serviceLocator.registerService(EventSupport.class, new EventSupportImpl()); ServiceActivityUpdaterForWriteTransactions.register(this); this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); @@ -315,7 +328,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { getClusterTable().refresh(csid, this, clusterUID); } catch (Throwable t) { - Logger.defaultLogError("Refesh failed.", t); + LOGGER.error("refresh({}, {}) failed", thread, csid, t); } } @@ -324,31 +337,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private Object result; final Semaphore sema = new Semaphore(0); private Throwable throwable = null; - final Callback callback = new Callback() { - @Override - public void run(DatabaseException e) { - synchronized (TaskHelper.this) { - throwable = e; - } + final Consumer callback = e -> { + synchronized (TaskHelper.this) { + throwable = e; } }; final Procedure proc = new Procedure() { @Override public void execute(Object result) { - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (t instanceof DatabaseException) - callback.run((DatabaseException)t); + callback.accept((DatabaseException)t); else - callback.run(new DatabaseException("" + name + "operation failed.", t)); + callback.accept(new DatabaseException("" + name + "operation failed.", t)); } }; final WriteTraits writeTraits = new WriteTraits() {}; TaskHelper(String name) { this.name = name; } + @SuppressWarnings("unchecked") T getResult() { return (T)result; } @@ -387,7 +398,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return null; } - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -395,7 +406,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final Write request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final Write request, final Consumer callback, final Semaphore notify, Boolean combine) { assert (request != null); @@ -408,12 +419,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } } - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { if(Development.DEVELOPMENT) { try { @@ -431,6 +440,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); @@ -440,39 +450,39 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); assert (null != writer); -// writer.state.barrier.inc(); + writer.asyncBarrier.inc(); try { request.perform(writer); assert (null != writer); } catch (Throwable t) { if (!(t instanceof CancelTransactionException)) - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t); + LOGGER.error("Write transaction caused an unexpected error, see exception.", t); writeState.except(t); } finally { -// writer.state.barrier.dec(); + writer.asyncBarrier.dec(); // writer.waitAsync(request); } - assert(!queryProvider2.dirty); + assert(!queryProvider2.cache.dirty); } catch (Throwable e) { // Log it first, just to be safe that the error is always logged. if (!(e instanceof CancelTransactionException)) - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); + LOGGER.error("Write transaction caused an unexpected error, see exception.", e); // writeState.getGraph().state.barrier.dec(); // writeState.getGraph().waitAsync(request); @@ -499,7 +509,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule // state.cancelCommit(context, clusterStream); // if (!empty) { // if (!context.isOk()) // this is a blocking operation -// throw new InternalException("Cancel failed. This should never happen. Contact application support."); +// throw new InternalException("Cancel failed. This should never happen."); // getQueryProvider2().performDirtyUpdates(writeState.getGraph()); // } // state.cancelCommit2(context, clusterStream); @@ -545,69 +555,37 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); VirtualGraph vg = getProvider(request.getProvider()); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); - try { - WriteState writeStateT = new WriteState(writer, request, notify, procedure); - writeState = writeStateT; + WriteState writeStateT = new WriteState(writer, request, notify, procedure); + writeState = writeStateT; + assert (null != writer); - assert (null != writer); -// writer.state.barrier.inc(); + try { + writer.asyncBarrier.inc(); writeStateT.setResult(request.perform(writer)); assert (null != writer); - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - } catch (Throwable e) { - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - writeState.except(e); - -// state.stopWriteTransaction(clusterStream); -// -// } catch (Throwable e) { -// // Log it first, just to be safe that the error is always logged. -// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); -// -// try { -// // Callback is client code, we have to be prepared for it to throw unexpected exceptions. -// // All we can do here is to log those, can't really pass them anywhere. -// if (procedure != null) { -// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e); -// else procedure.exception(new DatabaseException(e)); -// } -// } catch (Throwable e2) { -// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); -// } -// -// clientChanges = new ClientChangesImpl(SessionImplSocket.this); -// -// state.stopWriteTransaction(clusterStream); - } finally { + writer.asyncBarrier.dec(); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); } -// if(notify != null) notify.release(); - task.finish(); } @@ -616,7 +594,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @@ -624,33 +602,31 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean) */ @Override - public void scheduleRequest(final DelayedWrite request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { - final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); + final ITask total = ThreadLogger.task("ScheduleDelayedWrite"); assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); Procedure stateProcedure = new Procedure() { @Override public void execute(Object result) { if (callback != null) - callback.run(null); + callback.accept(null); } @Override public void exception(Throwable t) { if (callback != null) { - if (t instanceof DatabaseException) callback.run((DatabaseException) t); - else callback.run(new DatabaseException(t)); + if (t instanceof DatabaseException) callback.accept((DatabaseException) t); + else callback.accept(new DatabaseException(t)); } else - Logger.defaultLogError("Unhandled exception", t); + LOGGER.error("Unhandled exception", t); } }; @@ -675,10 +651,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule delayedWriteState = null; - ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); + ITask task2 = ThreadLogger.task("DelayedWriteCommit"); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -849,14 +826,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule private void maintainCluster(ClusterImpl before, ClusterI after_) { if(after_ != null && after_ != before) { ClusterImpl after = (ClusterImpl)after_; - if(currentCluster == before) currentCluster = after; + if(currentCluster == before) { + currentCluster = after; + } clusterTable.replaceCluster(after); } } public int createResourceKey(int foreignCounter) throws DatabaseException { - if(currentCluster == null) + if(currentCluster == null) { currentCluster = getNewResourceCluster(); + } if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) { ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster(); newCluster.foreignLookup = new byte[foreignCounter]; @@ -1017,15 +997,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s); - int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p); - int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o); ClusterI pc = clusterTable.getClusterProxyByResourceKey(p); ClusterI oc = clusterTable.getClusterProxyByResourceKey(o); clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION); - clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION); - clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION); + clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION); + clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION); clusterTranslator.removeStatement(cluster); queryProvider2.invalidateResource(s); @@ -1290,11 +1268,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { - int thread = request.hashCode() & queryProvider2.THREAD_MASK; + //int thread = request.hashCode() & queryProvider2.THREAD_MASK; fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1345,7 +1324,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule callback.exception(new DatabaseException(e)); state.stopWriteTransaction(clusterStream); - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); + LOGGER.error("Write transaction caused an unexpected error, see exception.", e); } finally { @@ -1355,23 +1334,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify) { scheduleRequest(request, callback, notify, null); } @Override - public void scheduleRequest(final WriteOnly request, final Callback callback, final Semaphore notify, Boolean combine) { + public void scheduleRequest(final WriteOnly request, final Consumer callback, final Semaphore notify, Boolean combine) { assertAlive(); assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1380,6 +1357,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; + Disposable.safeDispose(clientChanges); clientChanges = new ClientChangesImpl(SessionImplSocket.this); acquireWriteOnly(); @@ -1393,12 +1371,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void execute(Object result) { - if(callback != null) callback.run(null); + if(callback != null) callback.accept(null); } @Override public void exception(Throwable t) { - if(callback != null) callback.run((DatabaseException)t); + if(callback != null) callback.accept((DatabaseException)t); } }); @@ -1421,7 +1399,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(!(e instanceof CancelTransactionException)) { if (callback != null) - callback.run(new DatabaseException(e)); + callback.accept(new DatabaseException(e)); } writeState.except(e); @@ -1465,12 +1443,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleWrite(new SessionTask(request, thread) { + requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1489,25 +1465,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); assert (procedure != null); - int thread = request.hashCode() & queryProvider2.THREAD_MASK; - - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + // This is never synced but increase to prevent it from visiting 0 + newGraph.asyncBarrier.inc(); + try { if (listener != null) { try { - newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure() { + + AsyncProcedure ap = new AsyncProcedure() { @Override public void exception(AsyncReadGraph graph, Throwable t) { @@ -1525,7 +1505,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule procedure.execute(graph, t); } - }, listener); + }; + + QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true); + } catch (Throwable t) { // This is handled by the AsyncProcedure //Logger.defaultLogError("Internal error", t); @@ -1535,8 +1518,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { -// newGraph.state.barrier.inc(); - T t = request.perform(newGraph); try { @@ -1549,7 +1530,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(throwable != null) { throwable.set(th); } else { - Logger.defaultLogError("Unhandled exception", th); + LOGGER.error("Unhandled exception", th); } } @@ -1562,7 +1543,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(throwable != null) { throwable.set(t); } else { - Logger.defaultLogError("Unhandled exception", t); + LOGGER.error("Unhandled exception", t); } try { @@ -1574,15 +1555,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(throwable != null) { throwable.set(t2); } else { - Logger.defaultLogError("Unhandled exception", t2); + LOGGER.error("Unhandled exception", t2); } } } - -// newGraph.state.barrier.dec(); -// newGraph.waitAsync(request); + + task.finish(); } @@ -1605,13 +1585,84 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread) { + requestManager.scheduleRead(new SessionRead(null, notify) { + + @Override + public void run0(int thread) { + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + ITask task = ThreadLogger.task(request); + + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + newGraph.asyncBarrier.inc(); + + try { + + if (listener != null) { + + try { + QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + LOGGER.error("Unhandled query exception", e); + } + + } else { + + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, null, procedure, request, true) { + + public void execute(AsyncReadGraph graph_, T result) { + task.finish(); + super.execute(graph_, result); + } + + public void exception(AsyncReadGraph graph_, Throwable t) { + task.finish(); + super.exception(graph_, t); + } + + }; + + try { + wrap.performSync(request); + } catch (DatabaseException e) { + LOGGER.error("Unhandled query exception", e); + } + + } + + } finally { + + newGraph.asyncBarrier.dec(); + + fireSessionVariableChange(SessionVariables.QUEUED_READS); + + } + + } + + }); + + } + + public void scheduleRequest(final MultiRead request, final SyncMultiProcedure procedure, final Semaphore notify) { + + assert (request != null); + assert (procedure != null); + + int thread = request.hashCode() & queryProvider2.THREAD_MASK; + + int sync = notify != null ? thread : -1; + + requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ListenerBase listener = getListenerBase(procedure); + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); try { @@ -1624,22 +1675,15 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } else { - final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( - procedure, "request"); + final ResultCallWrappedSyncQueryProcedure wrapper = new ResultCallWrappedSyncQueryProcedure(procedure); try { -// newGraph.state.barrier.inc(); - request.perform(newGraph, wrapper); -// newGraph.waitAsync(request); - } catch (Throwable t) { - wrapper.exception(newGraph, t); -// newGraph.waitAsync(request); - + t.printStackTrace(); } @@ -1656,7 +1700,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); } - + public void scheduleRequest(final AsyncMultiRead request, final AsyncMultiProcedure procedure, final Semaphore notify) { assert (request != null); @@ -1666,10 +1710,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int sync = notify != null ? thread : -1; - requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) { + requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1720,10 +1764,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) { + requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1735,25 +1779,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (listener != null) { - newGraph.processor.query(newGraph, request, null, new Procedure() { - - @Override - public void exception(Throwable t) { - procedure.exception(t); - if(throwable != null) { - throwable.set(t); - } - } - - @Override - public void execute(T t) { - if(result != null) result.set(t); - procedure.execute(t); - } - - }, listener); - -// newGraph.waitAsync(request); + try { + QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + Logger.defaultLogError(e); + } } else { @@ -1960,14 +1990,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - - @Override - public void run(DatabaseException e) { - exception.set(e); - } - - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2061,12 +2084,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertNotSession(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2077,12 +2095,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assertAlive(); Semaphore notify = new Semaphore(0); final DataContainer exception = new DataContainer(); - scheduleRequest(request, new Callback() { - @Override - public void run(DatabaseException e) { - exception.set(e); - } - }, notify); + scheduleRequest(request, e -> exception.set(e), notify); acquire(notify, request); if(exception.get() != null) throw exception.get(); } @@ -2114,7 +2127,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final Write request, final Callback callback) { + public void asyncRequest(final Write request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2142,7 +2155,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final DelayedWrite request, final Callback callback) { + public void asyncRequest(final DelayedWrite request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2159,7 +2172,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public void asyncRequest(final WriteOnly request, final Callback callback) { + public void asyncRequest(final WriteOnly request, final Consumer callback) { scheduleRequest(request, callback, null); @@ -2283,16 +2296,16 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return true; } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2306,24 +2319,24 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); } - static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback runnable) { + static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer runnable) { - Callback composite = new Callback() { + Consumer composite = new Consumer() { AtomicInteger ready = new AtomicInteger(support.providers.size() + 1); @Override - public void run(ReadGraphImpl graph) { + public void accept(ReadGraphImpl graph) { if(ready.decrementAndGet() == 0) { - runnable.run(graph); + runnable.accept(graph); } } @@ -2337,11 +2350,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule e.printStackTrace(); } } else { - composite.run(graph); + composite.accept(graph); } } - composite.run(graph); + composite.accept(graph); } @@ -2438,6 +2451,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return; WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null); + reactionGraph.asyncBarrier.inc(); try { @@ -2446,16 +2460,18 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { l.graphChanged(e2); } catch (Throwable ex) { - ex.printStackTrace(); + LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex); } } } finally { + reactionGraph.asyncBarrier.dec(); + } } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t); } } @@ -2495,39 +2511,41 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule */ @SuppressWarnings("unchecked") @Override - public synchronized T peekService(Class api) { - - if(serviceKey1 == api) { - return (T)service1; - } else if (serviceKey2 == api) { - // Promote this key - Object result = service2; - service2 = service1; - serviceKey2 = serviceKey1; - service1 = result; - serviceKey1 = api; - return (T)result; - } + public T peekService(Class api) { + if (Layer0.class == api) + return (T) L0; - if (Layer0.class == api) - return (T) L0; - if (ServerInformation.class == api) - return (T) getCachedServerInformation(); - else if (WriteGraphImpl.class == api) - return (T) writeState.getGraph(); - else if (ClusterBuilder.class == api) - return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport); - else if (ClusterBuilderFactory.class == api) - return (T)new ClusterBuilderFactoryImpl(this); + synchronized (this) { + if (serviceKey1 == api) { + return (T) service1; + } + if (serviceKey2 == api) { + // Promote this key + Object result = service2; + service2 = service1; + serviceKey2 = serviceKey1; + service1 = result; + serviceKey1 = api; + return (T)result; + } - service2 = service1; - serviceKey2 = serviceKey1; + if (ServerInformation.class == api) + return (T) getCachedServerInformation(); + else if (WriteGraphImpl.class == api) + return (T) writeState.getGraph(); + else if (ClusterBuilder.class == api) + return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport); + else if (ClusterBuilderFactory.class == api) + return (T)new ClusterBuilderFactoryImpl(this); - service1 = serviceLocator.peekService(api); - serviceKey1 = api; + service2 = service1; + serviceKey2 = serviceKey1; - return (T)service1; + service1 = serviceLocator.peekService(api); + serviceKey1 = api; + return (T)service1; + } } /* @@ -2574,9 +2592,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { h.valuesChanged(ctx); } catch (Exception e) { - Logger.defaultLogError("monitor handler notification produced the following exception", e); + LOGGER.error("monitor handler notification produced the following exception", e); } catch (LinkageError e) { - Logger.defaultLogError("monitor handler notification produced a linkage error", e); + LOGGER.error("monitor handler notification produced a linkage error", e); } } } @@ -3060,21 +3078,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule final ArrayList result = new ArrayList(); final DataContainer exception = new DataContainer(); - syncRequest(request, new AsyncMultiProcedure() { + syncRequest(request, new SyncMultiProcedure() { @Override - public void execute(AsyncReadGraph graph, T t) { + public void execute(ReadGraph graph, T t) { synchronized(result) { result.add(t); } } @Override - public void finished(AsyncReadGraph graph) { + public void finished(ReadGraph graph) { } @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { exception.set(t); } @@ -3092,33 +3110,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule } @Override - public Collection syncRequest(MultiRead request, AsyncMultiListener procedure) throws DatabaseException { + public Collection syncRequest(MultiRead arg0, SyncMultiProcedure arg1) throws DatabaseException { assertNotSession(); - return syncRequest(request, (AsyncMultiProcedure)procedure); + throw new Error("Not implemented!"); } @Override public Collection syncRequest(MultiRead request, SyncMultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiListener(procedure)); + return syncRequest(request, (SyncMultiProcedure)procedure); } @Override public Collection syncRequest(MultiRead request, MultiListener procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiListener(procedure)); - } - - @Override - public Collection syncRequest(MultiRead request, SyncMultiProcedure procedure) throws DatabaseException { - assertNotSession(); - return syncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public Collection syncRequest(MultiRead request, MultiProcedure procedure) throws DatabaseException { assertNotSession(); - return syncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + return syncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3153,9 +3165,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + if (t != null) { + if (t instanceof DatabaseException) + throw (DatabaseException) t; + else + throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result; @@ -3274,38 +3288,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert(request != null); - asyncRequest(request, new AsyncMultiProcedureAdapter() { + asyncRequest(request, new SyncMultiProcedureAdapter() { @Override - public void exception(AsyncReadGraph graph, Throwable t) { + public void exception(ReadGraph graph, Throwable t) { t.printStackTrace(); } }); } - @Override - public void asyncRequest(MultiRead request, AsyncMultiListener procedure) { - asyncRequest(request, (AsyncMultiProcedure)procedure); - } - @Override public void asyncRequest(MultiRead request, SyncMultiListener procedure) { - asyncRequest(request, new SyncToAsyncMultiListener(procedure)); + asyncRequest(request, (SyncMultiProcedure)procedure); } @Override public void asyncRequest(MultiRead request, MultiListener procedure) { - asyncRequest(request, new NoneToAsyncMultiListener(procedure)); + asyncRequest(request, new NoneToSyncMultiListener(procedure)); } @Override public void asyncRequest(MultiRead request, SyncMultiProcedure procedure) { - asyncRequest(request, new SyncToAsyncMultiProcedure(procedure)); + scheduleRequest(request, procedure, null); } @Override public void asyncRequest(MultiRead request, MultiProcedure procedure) { - asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); + asyncRequest(request, new NoneToSyncMultiProcedure(procedure)); } @Override @@ -3347,17 +3356,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, new NoneToAsyncMultiProcedure(procedure)); } - @Override - public Collection syncRequest(MultiRead arg0, AsyncMultiProcedure arg1) throws DatabaseException { - assertNotSession(); - throw new Error("Not implemented!"); - } - - @Override - public void asyncRequest(MultiRead arg0, AsyncMultiProcedure arg1) { - throw new Error("Not implemented!"); - } - @Override final public void asyncRequest(final ExternalRead request) { @@ -3377,28 +3375,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, (Procedure)procedure); } - - - void check(Throwable t) throws DatabaseException { - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - void check(DataContainer container) throws DatabaseException { - Throwable t = container.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - - - - - boolean sameProvider(Write request) { if(writeState.getGraph().provider != null) { return writeState.getGraph().provider.equals(request.getProvider()); @@ -3415,9 +3391,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group"); + private void assertNotSession() throws DatabaseException { Thread current = Thread.currentThread(); - if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction."); + if (sessionThreads.contains(current)) + throw new ServiceException("Caller is already inside a transaction."); } void assertAlive() { @@ -3542,8 +3520,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule while(dirtyPrimitives) { dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); + getQueryProvider2().propagateChangesInQueryCache(writer); + getQueryProvider2().listening.fireListeners(writer); } fireMetadataListeners(writer, clientChanges); @@ -3603,4 +3581,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule state.setCombine(false); } + @SuppressWarnings("unchecked") + @Override + public T l0() { + return (T) L0; + } + }