/*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
* in Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.eclipse.core.runtime.Platform;
import org.simantics.databoard.Bindings;
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.ChangeSet;
import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.Disposable;
import org.simantics.db.ExternalValueSupport;
import org.simantics.db.Metadata;
import org.simantics.db.MonitorContext;
import org.simantics.db.MonitorHandler;
+import org.simantics.db.ReadGraph;
import org.simantics.db.Resource;
import org.simantics.db.ResourceSerializer;
import org.simantics.db.Session;
import org.simantics.db.common.TransactionPolicyRelease;
import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
import org.simantics.db.exception.RuntimeDatabaseException;
import org.simantics.db.exception.ServiceException;
import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
import org.simantics.db.impl.ClusterBase;
import org.simantics.db.impl.ClusterI;
import org.simantics.db.impl.ClusterTraitsBase;
import org.simantics.db.impl.graph.WriteSupport;
import org.simantics.db.impl.internal.RandomAccessValueSupport;
import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
import org.simantics.db.impl.query.QueryProcessor;
import org.simantics.db.impl.query.QueryProcessor.SessionRead;
import org.simantics.db.impl.query.QueryProcessor.SessionTask;
import org.simantics.db.service.CollectionSupport;
import org.simantics.db.service.DebugSupport;
import org.simantics.db.service.DirectQuerySupport;
+import org.simantics.db.service.EventSupport;
import org.simantics.db.service.GraphChangeListenerSupport;
import org.simantics.db.service.InitSupport;
import org.simantics.db.service.LifecycleSupport;
import org.simantics.layer0.Layer0;
import org.simantics.utils.DataContainer;
import org.simantics.utils.Development;
-import org.simantics.utils.datastructures.Callback;
import org.simantics.utils.threads.logger.ITask;
import org.simantics.utils.threads.logger.ThreadLogger;
+import org.slf4j.LoggerFactory;
import gnu.trove.procedure.TLongProcedure;
import gnu.trove.set.hash.TLongHashSet;
public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
+ private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
+
protected static final boolean DEBUG = false;
private static final boolean DIAGNOSTICS = false;
serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
+ serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
ServiceActivityUpdaterForWriteTransactions.register(this);
this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
try {
getClusterTable().refresh(csid, this, clusterUID);
} catch (Throwable t) {
- Logger.defaultLogError("Refesh failed.", t);
+ LOGGER.error("refresh({}, {}) failed", thread, csid, t);
}
}
private Object result;
final Semaphore sema = new Semaphore(0);
private Throwable throwable = null;
- final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
- @Override
- public void run(DatabaseException e) {
- synchronized (TaskHelper.this) {
- throwable = e;
- }
+ final Consumer<DatabaseException> callback = e -> {
+ synchronized (TaskHelper.this) {
+ throwable = e;
}
};
final Procedure<Object> proc = new Procedure<Object>() {
@Override
public void execute(Object result) {
- callback.run(null);
+ callback.accept(null);
}
@Override
public void exception(Throwable t) {
if (t instanceof DatabaseException)
- callback.run((DatabaseException)t);
+ callback.accept((DatabaseException)t);
else
- callback.run(new DatabaseException("" + name + "operation failed.", t));
+ callback.accept(new DatabaseException("" + name + "operation failed.", t));
}
};
final WriteTraits writeTraits = new WriteTraits() {};
TaskHelper(String name) {
this.name = name;
}
+ @SuppressWarnings("unchecked")
<T> T getResult() {
return (T)result;
}
return null;
}
- public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
+ public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
scheduleRequest(request, callback, notify, null);
}
* @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
*/
@Override
- public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+ public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
assert (request != null);
}
}
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
- requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+ requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
if(Development.DEVELOPMENT) {
try {
try {
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
VirtualGraph vg = getProvider(request.getProvider());
@Override
public void execute(Object result) {
- if(callback != null) callback.run(null);
+ if(callback != null) callback.accept(null);
}
@Override
public void exception(Throwable t) {
- if(callback != null) callback.run((DatabaseException)t);
+ if(callback != null) callback.accept((DatabaseException)t);
}
});
assert (null != writer);
-// writer.state.barrier.inc();
+ writer.asyncBarrier.inc();
try {
request.perform(writer);
assert (null != writer);
} catch (Throwable t) {
if (!(t instanceof CancelTransactionException))
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", t);
writeState.except(t);
} finally {
-// writer.state.barrier.dec();
+ writer.asyncBarrier.dec();
// writer.waitAsync(request);
}
- assert(!queryProvider2.dirty);
+ assert(!queryProvider2.cache.dirty);
} catch (Throwable e) {
// Log it first, just to be safe that the error is always logged.
if (!(e instanceof CancelTransactionException))
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
// writeState.getGraph().state.barrier.dec();
// writeState.getGraph().waitAsync(request);
// state.cancelCommit(context, clusterStream);
// if (!empty) {
// if (!context.isOk()) // this is a blocking operation
-// throw new InternalException("Cancel failed. This should never happen. Contact application support.");
+// throw new InternalException("Cancel failed. This should never happen.");
// getQueryProvider2().performDirtyUpdates(writeState.getGraph());
// }
// state.cancelCommit2(context, clusterStream);
assert (request != null);
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
- requestManager.scheduleWrite(new SessionTask(request, thread) {
+ requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
VirtualGraph vg = getProvider(request.getProvider());
WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
- try {
- WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
- writeState = writeStateT;
+ WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
+ writeState = writeStateT;
+ assert (null != writer);
- assert (null != writer);
-// writer.state.barrier.inc();
+ try {
+ writer.asyncBarrier.inc();
writeStateT.setResult(request.perform(writer));
assert (null != writer);
-
-// writer.state.barrier.dec();
-// writer.waitAsync(null);
-
} catch (Throwable e) {
-
-// writer.state.barrier.dec();
-// writer.waitAsync(null);
-
writeState.except(e);
-
-// state.stopWriteTransaction(clusterStream);
-//
-// } catch (Throwable e) {
-// // Log it first, just to be safe that the error is always logged.
-// Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
-//
-// try {
-// // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
-// // All we can do here is to log those, can't really pass them anywhere.
-// if (procedure != null) {
-// if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
-// else procedure.exception(new DatabaseException(e));
-// }
-// } catch (Throwable e2) {
-// Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
-// }
-//
-// clientChanges = new ClientChangesImpl(SessionImplSocket.this);
-//
-// state.stopWriteTransaction(clusterStream);
-
} finally {
+ writer.asyncBarrier.dec();
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
}
-// if(notify != null) notify.release();
-
task.finish();
}
}
- public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
+ public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
scheduleRequest(request, callback, notify, null);
}
* @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
*/
@Override
- public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+ public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
- final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+ final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
assert (request != null);
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
- requestManager.scheduleWrite(new SessionTask(request, thread) {
+ requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
Procedure<Object> stateProcedure = new Procedure<Object>() {
@Override
public void execute(Object result) {
if (callback != null)
- callback.run(null);
+ callback.accept(null);
}
@Override
public void exception(Throwable t) {
if (callback != null) {
- if (t instanceof DatabaseException) callback.run((DatabaseException) t);
- else callback.run(new DatabaseException(t));
+ if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
+ else callback.accept(new DatabaseException(t));
} else
- Logger.defaultLogError("Unhandled exception", t);
+ LOGGER.error("Unhandled exception", t);
}
};
} catch (Throwable e) {
delayedWriteState.except(e);
total.finish();
+ dwg.close();
return;
} finally {
// newGraph.state.barrier.dec();
delayedWriteState = null;
- ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+ ITask task2 = ThreadLogger.task("DelayedWriteCommit");
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
acquireWriteOnly();
private void maintainCluster(ClusterImpl before, ClusterI after_) {
if(after_ != null && after_ != before) {
ClusterImpl after = (ClusterImpl)after_;
- if(currentCluster == before) currentCluster = after;
+ if(currentCluster == before) {
+ currentCluster = after;
+ }
clusterTable.replaceCluster(after);
}
}
public int createResourceKey(int foreignCounter) throws DatabaseException {
- if(currentCluster == null)
+ if(currentCluster == null) {
currentCluster = getNewResourceCluster();
+ }
if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
newCluster.foreignLookup = new byte[foreignCounter];
try {
int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
- int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
- int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
- clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
- clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
clusterTranslator.removeStatement(cluster);
queryProvider2.invalidateResource(s);
try {
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+ //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
acquireWriteOnly();
callback.exception(new DatabaseException(e));
state.stopWriteTransaction(clusterStream);
- Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
+ LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
} finally {
}
- public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
+ public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
scheduleRequest(request, callback, notify, null);
}
@Override
- public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+ public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
assertAlive();
assert (request != null);
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
- requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+ requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
acquireWriteOnly();
@Override
public void execute(Object result) {
- if(callback != null) callback.run(null);
+ if(callback != null) callback.accept(null);
}
@Override
public void exception(Throwable t) {
- if(callback != null) callback.run((DatabaseException)t);
+ if(callback != null) callback.accept((DatabaseException)t);
}
});
if(!(e instanceof CancelTransactionException)) {
if (callback != null)
- callback.run(new DatabaseException(e));
+ callback.accept(new DatabaseException(e));
}
writeState.except(e);
assert (request != null);
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
- requestManager.scheduleWrite(new SessionTask(request, thread) {
+ requestManager.scheduleWrite(new SessionTask(null) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
assert (request != null);
assert (procedure != null);
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
- requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+ requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ITask task = ThreadLogger.task(request);
+
ListenerBase listener = getListenerBase(procedure);
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+ // This is never synced but increase to prevent it from visiting 0
+ newGraph.asyncBarrier.inc();
+
try {
if (listener != null) {
try {
- newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
+
+ AsyncProcedure ap = new AsyncProcedure<T>() {
@Override
public void exception(AsyncReadGraph graph, Throwable t) {
procedure.execute(graph, t);
}
- }, listener);
+ };
+
+ QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
+
} catch (Throwable t) {
// This is handled by the AsyncProcedure
//Logger.defaultLogError("Internal error", t);
try {
-// newGraph.state.barrier.inc();
-
T t = request.perform(newGraph);
try {
if(throwable != null) {
throwable.set(th);
} else {
- Logger.defaultLogError("Unhandled exception", th);
+ LOGGER.error("Unhandled exception", th);
}
}
if(throwable != null) {
throwable.set(t);
} else {
- Logger.defaultLogError("Unhandled exception", t);
+ LOGGER.error("Unhandled exception", t);
}
try {
if(throwable != null) {
throwable.set(t2);
} else {
- Logger.defaultLogError("Unhandled exception", t2);
+ LOGGER.error("Unhandled exception", t2);
}
}
}
-
-// newGraph.state.barrier.dec();
-// newGraph.waitAsync(request);
+
+ task.finish();
}
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
+ requestManager.scheduleRead(new SessionRead(null, notify) {
+
+ @Override
+ public void run0(int thread) {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ ITask task = ThreadLogger.task(request);
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+ newGraph.asyncBarrier.inc();
+
+ try {
+
+ if (listener != null) {
+
+ try {
+ QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+ } catch (DatabaseException e) {
+ LOGGER.error("Unhandled query exception", e);
+ }
+
+ } else {
+
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
+
+ public void execute(AsyncReadGraph graph_, T result) {
+ task.finish();
+ super.execute(graph_, result);
+ }
+
+ public void exception(AsyncReadGraph graph_, Throwable t) {
+ task.finish();
+ super.exception(graph_, t);
+ }
+
+ };
+
+ try {
+ wrap.performSync(request);
+ } catch (DatabaseException e) {
+ LOGGER.error("Unhandled query exception", e);
+ }
+
+ }
+
+ } finally {
+
+ newGraph.asyncBarrier.dec();
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ }
+
+ }
+
+ });
+
+ }
+
+ public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+ assert (request != null);
+ assert (procedure != null);
+
+ int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+ int sync = notify != null ? thread : -1;
+
+ requestManager.scheduleRead(new SessionRead(null, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ListenerBase listener = getListenerBase(procedure);
+
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
try {
} else {
- final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
- procedure, "request");
+ final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
try {
-// newGraph.state.barrier.inc();
-
request.perform(newGraph, wrapper);
-// newGraph.waitAsync(request);
-
} catch (Throwable t) {
- wrapper.exception(newGraph, t);
-// newGraph.waitAsync(request);
-
+ t.printStackTrace();
}
});
}
-
+
public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
assert (request != null);
int sync = notify != null ? thread : -1;
- requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
+ requestManager.scheduleRead(new SessionRead(null, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+ requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
- public void run(int thread) {
+ public void run0(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
if (listener != null) {
- newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
-
- @Override
- public void exception(Throwable t) {
- procedure.exception(t);
- if(throwable != null) {
- throwable.set(t);
- }
- }
-
- @Override
- public void execute(T t) {
- if(result != null) result.set(t);
- procedure.execute(t);
- }
-
- }, listener);
-
-// newGraph.waitAsync(request);
+ try {
+ QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
} else {
assertAlive();
Semaphore notify = new Semaphore(0);
final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
- scheduleRequest(request, new Callback<DatabaseException>() {
-
- @Override
- public void run(DatabaseException e) {
- exception.set(e);
- }
-
- }, notify);
+ scheduleRequest(request, e -> exception.set(e), notify);
acquire(notify, request);
if(exception.get() != null) throw exception.get();
}
assertNotSession();
Semaphore notify = new Semaphore(0);
final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
- scheduleRequest(request, new Callback<DatabaseException>() {
- @Override
- public void run(DatabaseException e) {
- exception.set(e);
- }
- }, notify);
+ scheduleRequest(request, e -> exception.set(e), notify);
acquire(notify, request);
if(exception.get() != null) throw exception.get();
}
assertAlive();
Semaphore notify = new Semaphore(0);
final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
- scheduleRequest(request, new Callback<DatabaseException>() {
- @Override
- public void run(DatabaseException e) {
- exception.set(e);
- }
- }, notify);
+ scheduleRequest(request, e -> exception.set(e), notify);
acquire(notify, request);
if(exception.get() != null) throw exception.get();
}
}
@Override
- public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
+ public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
scheduleRequest(request, callback, null);
}
@Override
- public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
+ public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
scheduleRequest(request, callback, null);
}
@Override
- public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
+ public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
scheduleRequest(request, callback, null);
return true;
}
- static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
+ static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
- Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
+ Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
@Override
- public void run(ReadGraphImpl graph) {
+ public void accept(ReadGraphImpl graph) {
if(ready.decrementAndGet() == 0) {
- runnable.run(graph);
+ runnable.accept(graph);
}
}
e.printStackTrace();
}
} else {
- composite.run(graph);
+ composite.accept(graph);
}
}
- composite.run(graph);
+ composite.accept(graph);
}
- static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
+ static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
- Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
+ Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
@Override
- public void run(ReadGraphImpl graph) {
+ public void accept(ReadGraphImpl graph) {
if(ready.decrementAndGet() == 0) {
- runnable.run(graph);
+ runnable.accept(graph);
}
}
e.printStackTrace();
}
} else {
- composite.run(graph);
+ composite.accept(graph);
}
}
- composite.run(graph);
+ composite.accept(graph);
}
return;
WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
+ reactionGraph.asyncBarrier.inc();
try {
try {
l.graphChanged(e2);
} catch (Throwable ex) {
- ex.printStackTrace();
+ LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
}
}
} finally {
+ reactionGraph.asyncBarrier.dec();
+
}
} catch (Throwable t) {
- t.printStackTrace();
+ LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
}
}
*/
@SuppressWarnings("unchecked")
@Override
- public synchronized <T> T peekService(Class<T> api) {
-
- if(serviceKey1 == api) {
- return (T)service1;
- } else if (serviceKey2 == api) {
- // Promote this key
- Object result = service2;
- service2 = service1;
- serviceKey2 = serviceKey1;
- service1 = result;
- serviceKey1 = api;
- return (T)result;
- }
+ public <T> T peekService(Class<T> api) {
+ if (Layer0.class == api)
+ return (T) L0;
- if (Layer0.class == api)
- return (T) L0;
- if (ServerInformation.class == api)
- return (T) getCachedServerInformation();
- else if (WriteGraphImpl.class == api)
- return (T) writeState.getGraph();
- else if (ClusterBuilder.class == api)
- return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
- else if (ClusterBuilderFactory.class == api)
- return (T)new ClusterBuilderFactoryImpl(this);
+ synchronized (this) {
+ if (serviceKey1 == api) {
+ return (T) service1;
+ }
+ if (serviceKey2 == api) {
+ // Promote this key
+ Object result = service2;
+ service2 = service1;
+ serviceKey2 = serviceKey1;
+ service1 = result;
+ serviceKey1 = api;
+ return (T)result;
+ }
- service2 = service1;
- serviceKey2 = serviceKey1;
+ if (ServerInformation.class == api)
+ return (T) getCachedServerInformation();
+ else if (WriteGraphImpl.class == api)
+ return (T) writeState.getGraph();
+ else if (ClusterBuilder.class == api)
+ return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
+ else if (ClusterBuilderFactory.class == api)
+ return (T)new ClusterBuilderFactoryImpl(this);
- service1 = serviceLocator.peekService(api);
- serviceKey1 = api;
+ service2 = service1;
+ serviceKey2 = serviceKey1;
- return (T)service1;
+ service1 = serviceLocator.peekService(api);
+ serviceKey1 = api;
+ return (T)service1;
+ }
}
/*
try {
h.valuesChanged(ctx);
} catch (Exception e) {
- Logger.defaultLogError("monitor handler notification produced the following exception", e);
+ LOGGER.error("monitor handler notification produced the following exception", e);
} catch (LinkageError e) {
- Logger.defaultLogError("monitor handler notification produced a linkage error", e);
+ LOGGER.error("monitor handler notification produced a linkage error", e);
}
}
}
final ArrayList<T> result = new ArrayList<T>();
final DataContainer<Throwable> exception = new DataContainer<Throwable>();
- syncRequest(request, new AsyncMultiProcedure<T>() {
+ syncRequest(request, new SyncMultiProcedure<T>() {
@Override
- public void execute(AsyncReadGraph graph, T t) {
+ public void execute(ReadGraph graph, T t) {
synchronized(result) {
result.add(t);
}
}
@Override
- public void finished(AsyncReadGraph graph) {
+ public void finished(ReadGraph graph) {
}
@Override
- public void exception(AsyncReadGraph graph, Throwable t) {
+ public void exception(ReadGraph graph, Throwable t) {
exception.set(t);
}
}
@Override
- public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
+ public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
assertNotSession();
- return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
+ throw new Error("Not implemented!");
}
@Override
public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
assertNotSession();
- return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+ return syncRequest(request, (SyncMultiProcedure<T>)procedure);
}
@Override
public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
assertNotSession();
- return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
- }
-
- @Override
- public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
- assertNotSession();
- return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+ return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
}
@Override
public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
assertNotSession();
- return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+ return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
}
@Override
});
Throwable t = exception.get();
- if(t != null) {
- if(t instanceof DatabaseException) throw (DatabaseException)t;
- else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+ if (t != null) {
+ if (t instanceof DatabaseException)
+ throw (DatabaseException) t;
+ else
+ throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
}
return result;
assert(request != null);
- asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+ asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
@Override
- public void exception(AsyncReadGraph graph, Throwable t) {
+ public void exception(ReadGraph graph, Throwable t) {
t.printStackTrace();
}
});
}
- @Override
- public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
- asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
- }
-
@Override
public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
- asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+ asyncRequest(request, (SyncMultiProcedure<T>)procedure);
}
@Override
public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
- asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+ asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
}
@Override
public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
- asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+ scheduleRequest(request, procedure, null);
}
@Override
public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
- asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+ asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
}
@Override
asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
}
- @Override
- public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
- assertNotSession();
- throw new Error("Not implemented!");
- }
-
- @Override
- public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
- throw new Error("Not implemented!");
- }
-
@Override
final public <T> void asyncRequest(final ExternalRead<T> request) {
asyncRequest(request, (Procedure<T>)procedure);
}
-
-
- void check(Throwable t) throws DatabaseException {
- if(t != null) {
- if(t instanceof DatabaseException) throw (DatabaseException)t;
- else throw new DatabaseException("Unexpected exception", t);
- }
- }
-
- void check(DataContainer<Throwable> container) throws DatabaseException {
- Throwable t = container.get();
- if(t != null) {
- if(t instanceof DatabaseException) throw (DatabaseException)t;
- else throw new DatabaseException("Unexpected exception", t);
- }
- }
-
-
-
-
-
-
boolean sameProvider(Write request) {
if(writeState.getGraph().provider != null) {
return writeState.getGraph().provider.equals(request.getProvider());
public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
+
private void assertNotSession() throws DatabaseException {
Thread current = Thread.currentThread();
- if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
+ if (sessionThreads.contains(current))
+ throw new ServiceException("Caller is already inside a transaction.");
}
void assertAlive() {
while(dirtyPrimitives) {
dirtyPrimitives = false;
- getQueryProvider2().performDirtyUpdates(writer);
- getQueryProvider2().performScheduledUpdates(writer);
+ getQueryProvider2().propagateChangesInQueryCache(writer);
+ getQueryProvider2().listening.fireListeners(writer);
}
fireMetadataListeners(writer, clientChanges);
state.setCombine(false);
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public <T> T l0() {
+ return (T) L0;
+ }
+
}