import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
import org.eclipse.core.runtime.Platform;
import org.simantics.databoard.Bindings;
import org.simantics.db.AsyncReadGraph;
import org.simantics.db.ChangeSet;
import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.Disposable;
import org.simantics.db.ExternalValueSupport;
import org.simantics.db.Metadata;
import org.simantics.db.MonitorContext;
import org.simantics.db.impl.internal.RandomAccessValueSupport;
import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
import org.simantics.db.impl.query.QueryProcessor;
import org.simantics.db.impl.query.QueryProcessor.SessionRead;
import org.simantics.db.impl.query.QueryProcessor.SessionTask;
import org.simantics.layer0.Layer0;
import org.simantics.utils.DataContainer;
import org.simantics.utils.Development;
-import org.simantics.utils.datastructures.Callback;
import org.simantics.utils.threads.logger.ITask;
import org.simantics.utils.threads.logger.ThreadLogger;
private Object result;
final Semaphore sema = new Semaphore(0);
private Throwable throwable = null;
- final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
- @Override
- public void run(DatabaseException e) {
- synchronized (TaskHelper.this) {
- throwable = e;
- }
+ final Consumer<DatabaseException> callback = e -> {
+ synchronized (TaskHelper.this) {
+ throwable = e;
}
};
final Procedure<Object> proc = new Procedure<Object>() {
@Override
public void execute(Object result) {
- callback.run(null);
+ callback.accept(null);
}
@Override
public void exception(Throwable t) {
if (t instanceof DatabaseException)
- callback.run((DatabaseException)t);
+ callback.accept((DatabaseException)t);
else
- callback.run(new DatabaseException("" + name + "operation failed.", t));
+ callback.accept(new DatabaseException("" + name + "operation failed.", t));
}
};
final WriteTraits writeTraits = new WriteTraits() {};
TaskHelper(String name) {
this.name = name;
}
+ @SuppressWarnings("unchecked")
<T> T getResult() {
return (T)result;
}
return null;
}
- public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
+ public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
scheduleRequest(request, callback, notify, null);
}
* @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
*/
@Override
- public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+ public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
assert (request != null);
try {
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
VirtualGraph vg = getProvider(request.getProvider());
@Override
public void execute(Object result) {
- if(callback != null) callback.run(null);
+ if(callback != null) callback.accept(null);
}
@Override
public void exception(Throwable t) {
- if(callback != null) callback.run((DatabaseException)t);
+ if(callback != null) callback.accept((DatabaseException)t);
}
});
}
- assert(!queryProvider2.dirty);
+ assert(!queryProvider2.cache.dirty);
} catch (Throwable e) {
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
VirtualGraph vg = getProvider(request.getProvider());
}
- public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
+ public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
scheduleRequest(request, callback, notify, null);
}
* @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
*/
@Override
- public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+ public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
@Override
public void execute(Object result) {
if (callback != null)
- callback.run(null);
+ callback.accept(null);
}
@Override
public void exception(Throwable t) {
if (callback != null) {
- if (t instanceof DatabaseException) callback.run((DatabaseException) t);
- else callback.run(new DatabaseException(t));
+ if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
+ else callback.accept(new DatabaseException(t));
} else
Logger.defaultLogError("Unhandled exception", t);
}
} catch (Throwable e) {
delayedWriteState.except(e);
total.finish();
+ dwg.close();
return;
} finally {
// newGraph.state.barrier.dec();
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
acquireWriteOnly();
private void maintainCluster(ClusterImpl before, ClusterI after_) {
if(after_ != null && after_ != before) {
ClusterImpl after = (ClusterImpl)after_;
- if(currentCluster == before) currentCluster = after;
+ if(currentCluster == before) {
+ currentCluster = after;
+ }
clusterTable.replaceCluster(after);
}
}
public int createResourceKey(int foreignCounter) throws DatabaseException {
- if(currentCluster == null)
+ if(currentCluster == null) {
currentCluster = getNewResourceCluster();
+ }
if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
newCluster.foreignLookup = new byte[foreignCounter];
try {
int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
- int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
- int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
- clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
- clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
clusterTranslator.removeStatement(cluster);
queryProvider2.invalidateResource(s);
try {
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+ //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
acquireWriteOnly();
}
- public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
+ public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
scheduleRequest(request, callback, notify, null);
}
@Override
- public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+ public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
assertAlive();
fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
flushCounter = 0;
+ Disposable.safeDispose(clientChanges);
clientChanges = new ClientChangesImpl(SessionImplSocket.this);
acquireWriteOnly();
@Override
public void execute(Object result) {
- if(callback != null) callback.run(null);
+ if(callback != null) callback.accept(null);
}
@Override
public void exception(Throwable t) {
- if(callback != null) callback.run((DatabaseException)t);
+ if(callback != null) callback.accept((DatabaseException)t);
}
});
if(!(e instanceof CancelTransactionException)) {
if (callback != null)
- callback.run(new DatabaseException(e));
+ callback.accept(new DatabaseException(e));
}
writeState.except(e);
if (listener != null) {
try {
- newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
+
+ AsyncProcedure ap = new AsyncProcedure<T>() {
@Override
public void exception(AsyncReadGraph graph, Throwable t) {
procedure.execute(graph, t);
}
- }, listener);
+ };
+
+ QueryCache.runnerReadEntry(newGraph, request, null, listener, ap);
+
} catch (Throwable t) {
// This is handled by the AsyncProcedure
//Logger.defaultLogError("Internal error", t);
if (listener != null) {
- newGraph.processor.query(newGraph, request, null, procedure, listener);
-
-// newGraph.waitAsync(request);
+ try {
+ QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure);
+ //newGraph.processor.query(newGraph, request, null, procedure, listener);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
} else {
if (listener != null) {
- newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
-
- @Override
- public void exception(Throwable t) {
- procedure.exception(t);
- if(throwable != null) {
- throwable.set(t);
- }
- }
-
- @Override
- public void execute(T t) {
- if(result != null) result.set(t);
- procedure.execute(t);
- }
-
- }, listener);
+ try {
+ QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+
+// newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
+//
+// @Override
+// public void exception(Throwable t) {
+// procedure.exception(t);
+// if(throwable != null) {
+// throwable.set(t);
+// }
+// }
+//
+// @Override
+// public void execute(T t) {
+// if(result != null) result.set(t);
+// procedure.execute(t);
+// }
+//
+// }, listener);
// newGraph.waitAsync(request);
assertAlive();
Semaphore notify = new Semaphore(0);
final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
- scheduleRequest(request, new Callback<DatabaseException>() {
-
- @Override
- public void run(DatabaseException e) {
- exception.set(e);
- }
-
- }, notify);
+ scheduleRequest(request, e -> exception.set(e), notify);
acquire(notify, request);
if(exception.get() != null) throw exception.get();
}
assertNotSession();
Semaphore notify = new Semaphore(0);
final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
- scheduleRequest(request, new Callback<DatabaseException>() {
- @Override
- public void run(DatabaseException e) {
- exception.set(e);
- }
- }, notify);
+ scheduleRequest(request, e -> exception.set(e), notify);
acquire(notify, request);
if(exception.get() != null) throw exception.get();
}
assertAlive();
Semaphore notify = new Semaphore(0);
final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
- scheduleRequest(request, new Callback<DatabaseException>() {
- @Override
- public void run(DatabaseException e) {
- exception.set(e);
- }
- }, notify);
+ scheduleRequest(request, e -> exception.set(e), notify);
acquire(notify, request);
if(exception.get() != null) throw exception.get();
}
}
@Override
- public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
+ public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
scheduleRequest(request, callback, null);
}
@Override
- public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
+ public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
scheduleRequest(request, callback, null);
}
@Override
- public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
+ public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
scheduleRequest(request, callback, null);
return true;
}
- static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
+ static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
- Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
+ Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
@Override
- public void run(ReadGraphImpl graph) {
+ public void accept(ReadGraphImpl graph) {
if(ready.decrementAndGet() == 0) {
- runnable.run(graph);
+ runnable.accept(graph);
}
}
e.printStackTrace();
}
} else {
- composite.run(graph);
+ composite.accept(graph);
}
}
- composite.run(graph);
+ composite.accept(graph);
}
- static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
+ static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
- Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
+ Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
@Override
- public void run(ReadGraphImpl graph) {
+ public void accept(ReadGraphImpl graph) {
if(ready.decrementAndGet() == 0) {
- runnable.run(graph);
+ runnable.accept(graph);
}
}
e.printStackTrace();
}
} else {
- composite.run(graph);
+ composite.accept(graph);
}
}
- composite.run(graph);
+ composite.accept(graph);
}
public int getAmountOfQueryThreads() {
// This must be a power of two
- return 1;
+ return 4;
// return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
}