X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.procore%2Fsrc%2Ffi%2Fvtt%2Fsimantics%2Fprocore%2Finternal%2FSessionImplSocket.java;h=29c70445bd208dd190c9796b60f2e31f46408dca;hb=acd711765a4a064361b3a51aa0aaeae9881fd43f;hp=a08011fed1172615a2f937cb9de08bc099f3f1de;hpb=782843a248c4f70d28e8146098f9a77977cc6b3c;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java index a08011fed..29c70445b 100644 --- a/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java +++ b/bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java @@ -143,6 +143,7 @@ import org.simantics.db.service.ClusteringSupport; import org.simantics.db.service.CollectionSupport; import org.simantics.db.service.DebugSupport; import org.simantics.db.service.DirectQuerySupport; +import org.simantics.db.service.EventSupport; import org.simantics.db.service.GraphChangeListenerSupport; import org.simantics.db.service.InitSupport; import org.simantics.db.service.LifecycleSupport; @@ -166,6 +167,7 @@ import org.simantics.utils.DataContainer; import org.simantics.utils.Development; import org.simantics.utils.threads.logger.ITask; import org.simantics.utils.threads.logger.ThreadLogger; +import org.slf4j.LoggerFactory; import gnu.trove.procedure.TLongProcedure; import gnu.trove.set.hash.TLongHashSet; @@ -173,6 +175,8 @@ import gnu.trove.set.hash.TLongHashSet; public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport { + private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class); + protected static final boolean DEBUG = false; private static final boolean DIAGNOSTICS = false; @@ -289,6 +293,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this)); serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl()); serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl()); + serviceLocator.registerService(EventSupport.class, new EventSupportImpl()); ServiceActivityUpdaterForWriteTransactions.register(this); this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t); @@ -323,7 +328,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { getClusterTable().refresh(csid, this, clusterUID); } catch (Throwable t) { - Logger.defaultLogError("Refesh failed.", t); + LOGGER.error("refresh({}, {}) failed", thread, csid, t); } } @@ -417,7 +422,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { if(Development.DEVELOPMENT) { try { @@ -456,17 +461,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); assert (null != writer); -// writer.state.barrier.inc(); + writer.asyncBarrier.inc(); try { request.perform(writer); assert (null != writer); } catch (Throwable t) { if (!(t instanceof CancelTransactionException)) - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t); + LOGGER.error("Write transaction caused an unexpected error, see exception.", t); writeState.except(t); } finally { -// writer.state.barrier.dec(); + writer.asyncBarrier.dec(); // writer.waitAsync(request); } @@ -477,7 +482,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule // Log it first, just to be safe that the error is always logged. if (!(e instanceof CancelTransactionException)) - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); + LOGGER.error("Write transaction caused an unexpected error, see exception.", e); // writeState.getGraph().state.barrier.dec(); // writeState.getGraph().waitAsync(request); @@ -553,7 +558,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -566,52 +571,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule VirtualGraph vg = getProvider(request.getProvider()); WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg); - try { - WriteState writeStateT = new WriteState(writer, request, notify, procedure); - writeState = writeStateT; + WriteState writeStateT = new WriteState(writer, request, notify, procedure); + writeState = writeStateT; + assert (null != writer); - assert (null != writer); -// writer.state.barrier.inc(); + try { + writer.asyncBarrier.inc(); writeStateT.setResult(request.perform(writer)); assert (null != writer); - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - } catch (Throwable e) { - -// writer.state.barrier.dec(); -// writer.waitAsync(null); - writeState.except(e); - -// state.stopWriteTransaction(clusterStream); -// -// } catch (Throwable e) { -// // Log it first, just to be safe that the error is always logged. -// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); -// -// try { -// // Callback is client code, we have to be prepared for it to throw unexpected exceptions. -// // All we can do here is to log those, can't really pass them anywhere. -// if (procedure != null) { -// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e); -// else procedure.exception(new DatabaseException(e)); -// } -// } catch (Throwable e2) { -// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2); -// } -// -// clientChanges = new ClientChangesImpl(SessionImplSocket.this); -// -// state.stopWriteTransaction(clusterStream); - } finally { + writer.asyncBarrier.dec(); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); } -// if(notify != null) notify.release(); - task.finish(); } @@ -630,14 +604,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule @Override public void scheduleRequest(final DelayedWrite request, final Consumer callback, final Semaphore notify, Boolean combine) { - final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite"); + final ITask total = ThreadLogger.task("ScheduleDelayedWrite"); assert (request != null); requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); Procedure stateProcedure = new Procedure() { @@ -652,7 +626,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if (t instanceof DatabaseException) callback.accept((DatabaseException) t); else callback.accept(new DatabaseException(t)); } else - Logger.defaultLogError("Unhandled exception", t); + LOGGER.error("Unhandled exception", t); } }; @@ -677,7 +651,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule delayedWriteState = null; - ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit"); + ITask task2 = ThreadLogger.task("DelayedWriteCommit"); fireSessionVariableChange(SessionVariables.QUEUED_WRITES); flushCounter = 0; @@ -1350,7 +1324,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule callback.exception(new DatabaseException(e)); state.stopWriteTransaction(clusterStream); - Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e); + LOGGER.error("Write transaction caused an unexpected error, see exception.", e); } finally { @@ -1374,7 +1348,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1472,7 +1446,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleWrite(new SessionTask(null) { @Override - public void run(int thread) { + public void run0(int thread) { ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request); @@ -1491,19 +1465,22 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule assert (request != null); assert (procedure != null); - //int thread = request.hashCode() & queryProvider2.THREAD_MASK; - requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + ListenerBase listener = getListenerBase(procedure); final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + // This is never synced but increase to prevent it from visiting 0 + newGraph.asyncBarrier.inc(); + try { if (listener != null) { @@ -1541,8 +1518,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { -// newGraph.state.barrier.inc(); - T t = request.perform(newGraph); try { @@ -1555,7 +1530,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(throwable != null) { throwable.set(th); } else { - Logger.defaultLogError("Unhandled exception", th); + LOGGER.error("Unhandled exception", th); } } @@ -1568,7 +1543,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(throwable != null) { throwable.set(t); } else { - Logger.defaultLogError("Unhandled exception", t); + LOGGER.error("Unhandled exception", t); } try { @@ -1580,15 +1555,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule if(throwable != null) { throwable.set(t2); } else { - Logger.defaultLogError("Unhandled exception", t2); + LOGGER.error("Unhandled exception", t2); } } } - -// newGraph.state.barrier.dec(); -// newGraph.waitAsync(request); + + task.finish(); } @@ -1614,46 +1588,53 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); + ITask task = ThreadLogger.task(request); + final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2()); + newGraph.asyncBarrier.inc(); try { if (listener != null) { try { - QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); - //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true); - //newGraph.processor.query(newGraph, request, null, procedure, listener); - } catch (DatabaseException e) { - Logger.defaultLogError(e); - } + QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure); + } catch (DatabaseException e) { + LOGGER.error("Unhandled query exception", e); + } } else { -// final ResultCallWrappedSingleQueryProcedure4 wrapper = new ResultCallWrappedSingleQueryProcedure4( -// procedure, "request"); + BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph, null, procedure, request, true) { - BlockingAsyncProcedure wrap = new BlockingAsyncProcedure(newGraph.asyncBarrier, newGraph, procedure, request); + public void execute(AsyncReadGraph graph_, T result) { + task.finish(); + super.execute(graph_, result); + } - try { + public void exception(AsyncReadGraph graph_, Throwable t) { + task.finish(); + super.exception(graph_, t); + } - request.perform(newGraph, wrap); - wrap.get(); + }; + try { + wrap.performSync(request); } catch (DatabaseException e) { - - Logger.defaultLogError(e); - + LOGGER.error("Unhandled query exception", e); } } } finally { + newGraph.asyncBarrier.dec(); + fireSessionVariableChange(SessionVariables.QUEUED_READS); } @@ -1676,7 +1657,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1732,7 +1713,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(null, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -1786,7 +1767,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule requestManager.scheduleRead(new SessionRead(throwable, notify) { @Override - public void run(int thread) { + public void run0(int thread) { fireSessionVariableChange(SessionVariables.QUEUED_READS); @@ -2470,6 +2451,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule return; WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null); + reactionGraph.asyncBarrier.inc(); try { @@ -2478,16 +2460,18 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { l.graphChanged(e2); } catch (Throwable ex) { - ex.printStackTrace(); + LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex); } } } finally { + reactionGraph.asyncBarrier.dec(); + } } catch (Throwable t) { - t.printStackTrace(); + LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t); } } @@ -2527,39 +2511,41 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule */ @SuppressWarnings("unchecked") @Override - public synchronized T peekService(Class api) { - - if(serviceKey1 == api) { - return (T)service1; - } else if (serviceKey2 == api) { - // Promote this key - Object result = service2; - service2 = service1; - serviceKey2 = serviceKey1; - service1 = result; - serviceKey1 = api; - return (T)result; - } + public T peekService(Class api) { + if (Layer0.class == api) + return (T) L0; - if (Layer0.class == api) - return (T) L0; - if (ServerInformation.class == api) - return (T) getCachedServerInformation(); - else if (WriteGraphImpl.class == api) - return (T) writeState.getGraph(); - else if (ClusterBuilder.class == api) - return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport); - else if (ClusterBuilderFactory.class == api) - return (T)new ClusterBuilderFactoryImpl(this); + synchronized (this) { + if (serviceKey1 == api) { + return (T) service1; + } + if (serviceKey2 == api) { + // Promote this key + Object result = service2; + service2 = service1; + serviceKey2 = serviceKey1; + service1 = result; + serviceKey1 = api; + return (T)result; + } - service2 = service1; - serviceKey2 = serviceKey1; + if (ServerInformation.class == api) + return (T) getCachedServerInformation(); + else if (WriteGraphImpl.class == api) + return (T) writeState.getGraph(); + else if (ClusterBuilder.class == api) + return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport); + else if (ClusterBuilderFactory.class == api) + return (T)new ClusterBuilderFactoryImpl(this); - service1 = serviceLocator.peekService(api); - serviceKey1 = api; + service2 = service1; + serviceKey2 = serviceKey1; - return (T)service1; + service1 = serviceLocator.peekService(api); + serviceKey1 = api; + return (T)service1; + } } /* @@ -2606,9 +2592,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule try { h.valuesChanged(ctx); } catch (Exception e) { - Logger.defaultLogError("monitor handler notification produced the following exception", e); + LOGGER.error("monitor handler notification produced the following exception", e); } catch (LinkageError e) { - Logger.defaultLogError("monitor handler notification produced a linkage error", e); + LOGGER.error("monitor handler notification produced a linkage error", e); } } } @@ -3179,9 +3165,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule }); Throwable t = exception.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); + if (t != null) { + if (t instanceof DatabaseException) + throw (DatabaseException) t; + else + throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t); } return result; @@ -3387,28 +3375,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule asyncRequest(request, (Procedure)procedure); } - - - void check(Throwable t) throws DatabaseException { - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - void check(DataContainer container) throws DatabaseException { - Throwable t = container.get(); - if(t != null) { - if(t instanceof DatabaseException) throw (DatabaseException)t; - else throw new DatabaseException("Unexpected exception", t); - } - } - - - - - - boolean sameProvider(Write request) { if(writeState.getGraph().provider != null) { return writeState.getGraph().provider.equals(request.getProvider()); @@ -3425,9 +3391,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group"); + private void assertNotSession() throws DatabaseException { Thread current = Thread.currentThread(); - if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction."); + if (sessionThreads.contains(current)) + throw new ServiceException("Caller is already inside a transaction."); } void assertAlive() { @@ -3552,8 +3520,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule while(dirtyPrimitives) { dirtyPrimitives = false; - getQueryProvider2().performDirtyUpdates(writer); - getQueryProvider2().performScheduledUpdates(writer); + getQueryProvider2().propagateChangesInQueryCache(writer); + getQueryProvider2().listening.fireListeners(writer); } fireMetadataListeners(writer, clientChanges); @@ -3613,4 +3581,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule state.setCombine(false); } + @SuppressWarnings("unchecked") + @Override + public T l0() { + return (T) L0; + } + }