import org.simantics.db.service.CollectionSupport;
import org.simantics.db.service.DebugSupport;
import org.simantics.db.service.DirectQuerySupport;
+import org.simantics.db.service.EventSupport;
import org.simantics.db.service.GraphChangeListenerSupport;
import org.simantics.db.service.InitSupport;
import org.simantics.db.service.LifecycleSupport;
import org.simantics.utils.Development;
import org.simantics.utils.threads.logger.ITask;
import org.simantics.utils.threads.logger.ThreadLogger;
+import org.slf4j.LoggerFactory;
import gnu.trove.procedure.TLongProcedure;
import gnu.trove.set.hash.TLongHashSet;
public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
+ private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
+
protected static final boolean DEBUG = false;
private static final boolean DIAGNOSTICS = false;
serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
+ serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
ServiceActivityUpdaterForWriteTransactions.register(this);
this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
if(Development.DEVELOPMENT) {
try {
requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
@Override
public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
- final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+ final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
assert (request != null);
requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
Procedure<Object> stateProcedure = new Procedure<Object>() {
delayedWriteState = null;
- ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+ ITask task2 = ThreadLogger.task("DelayedWriteCommit");
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
assert (request != null);
assert (procedure != null);
- //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ITask task = ThreadLogger.task(request);
+
ListenerBase listener = getListenerBase(procedure);
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+ // This is never synced but increase to prevent it from visiting 0
+ newGraph.asyncBarrier.inc();
+
try {
if (listener != null) {
try {
-// newGraph.state.barrier.inc();
-
T t = request.perform(newGraph);
try {
}
}
-
-// newGraph.state.barrier.dec();
-// newGraph.waitAsync(request);
+
+ task.finish();
}
requestManager.scheduleRead(new SessionRead(null, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ITask task = ThreadLogger.task(request);
+
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
try {
try {
QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
- //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
- //newGraph.processor.query(newGraph, request, null, procedure, listener);
} catch (DatabaseException e) {
Logger.defaultLogError(e);
}
} else {
-// final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-// procedure, "request");
-
- BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
+
+ public void execute(AsyncReadGraph graph_, T result) {
+ task.finish();
+ super.execute(graph_, result);
+ }
+
+ public void exception(AsyncReadGraph graph_, Throwable t) {
+ task.finish();
+ super.exception(graph_, t);
+ }
+
+ };
try {
request.perform(newGraph, wrap);
+ wrap.dec();
wrap.get();
} catch (DatabaseException e) {
requestManager.scheduleRead(new SessionRead(null, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
requestManager.scheduleRead(new SessionRead(null, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
try {
l.graphChanged(e2);
} catch (Throwable ex) {
- ex.printStackTrace();
+ LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
}
}
}
} catch (Throwable t) {
- t.printStackTrace();
+ LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
}
}
});
Throwable t = exception.get();
- if(t != null) {
- if(t instanceof DatabaseException) throw (DatabaseException)t;
- else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+ if (t != null) {
+ if (t instanceof DatabaseException)
+ throw (DatabaseException) t;
+ else
+ throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
}
return result;
asyncRequest(request, (Procedure<T>)procedure);
}
-
-
- void check(Throwable t) throws DatabaseException {
- if(t != null) {
- if(t instanceof DatabaseException) throw (DatabaseException)t;
- else throw new DatabaseException("Unexpected exception", t);
- }
- }
-
- void check(DataContainer<Throwable> container) throws DatabaseException {
- Throwable t = container.get();
- if(t != null) {
- if(t instanceof DatabaseException) throw (DatabaseException)t;
- else throw new DatabaseException("Unexpected exception", t);
- }
- }
-
-
-
-
-
-
boolean sameProvider(Write request) {
if(writeState.getGraph().provider != null) {
return writeState.getGraph().provider.equals(request.getProvider());
public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
+
private void assertNotSession() throws DatabaseException {
Thread current = Thread.currentThread();
- if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
+ if (sessionThreads.contains(current))
+ throw new ServiceException("Caller is already inside a transaction.");
}
void assertAlive() {
while(dirtyPrimitives) {
dirtyPrimitives = false;
- getQueryProvider2().performDirtyUpdates(writer);
- getQueryProvider2().performScheduledUpdates(writer);
+ getQueryProvider2().propagateChangesInQueryCache(writer);
+ getQueryProvider2().listening.fireListeners(writer);
}
fireMetadataListeners(writer, clientChanges);