/*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
* in Industry THTH ry.
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
import org.simantics.db.Metadata;
import org.simantics.db.MonitorContext;
import org.simantics.db.MonitorHandler;
+import org.simantics.db.ReadGraph;
import org.simantics.db.Resource;
import org.simantics.db.ResourceSerializer;
import org.simantics.db.Session;
import org.simantics.db.common.TransactionPolicyRelease;
import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
import org.simantics.db.exception.RuntimeDatabaseException;
import org.simantics.db.exception.ServiceException;
import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
import org.simantics.db.impl.ClusterBase;
import org.simantics.db.impl.ClusterI;
import org.simantics.db.impl.ClusterTraitsBase;
import org.simantics.db.impl.graph.WriteSupport;
import org.simantics.db.impl.internal.RandomAccessValueSupport;
import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
import org.simantics.db.impl.query.QueryProcessor;
import org.simantics.db.impl.query.QueryProcessor.SessionRead;
import org.simantics.db.impl.query.QueryProcessor.SessionTask;
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+ requestManager.scheduleWrite(new SessionTask(true) {
@Override
public void run(int thread) {
}
- assert(!queryProvider2.dirty);
+ assert(!queryProvider2.cache.dirty);
} catch (Throwable e) {
// state.cancelCommit(context, clusterStream);
// if (!empty) {
// if (!context.isOk()) // this is a blocking operation
-// throw new InternalException("Cancel failed. This should never happen. Contact application support.");
+// throw new InternalException("Cancel failed. This should never happen.");
// getQueryProvider2().performDirtyUpdates(writeState.getGraph());
// }
// state.cancelCommit2(context, clusterStream);
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleWrite(new SessionTask(request, thread) {
+ requestManager.scheduleWrite(new SessionTask(true) {
@Override
public void run(int thread) {
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleWrite(new SessionTask(request, thread) {
+ requestManager.scheduleWrite(new SessionTask(true) {
@Override
public void run(int thread) {
try {
int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
- int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
- int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
- clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
- clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
+ clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
clusterTranslator.removeStatement(cluster);
queryProvider2.invalidateResource(s);
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+ requestManager.scheduleWrite(new SessionTask(true) {
@Override
public void run(int thread) {
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleWrite(new SessionTask(request, thread) {
+ requestManager.scheduleWrite(new SessionTask(true) {
@Override
public void run(int thread) {
assert (request != null);
assert (procedure != null);
- int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+ //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+ requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
public void run(int thread) {
if (listener != null) {
try {
- newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
+
+ AsyncProcedure ap = new AsyncProcedure<T>() {
@Override
public void exception(AsyncReadGraph graph, Throwable t) {
procedure.execute(graph, t);
}
- }, listener);
+ };
+
+ QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
+
} catch (Throwable t) {
// This is handled by the AsyncProcedure
//Logger.defaultLogError("Internal error", t);
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
+ requestManager.scheduleRead(new SessionRead(null, notify) {
+
+ @Override
+ public void run(int thread) {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+ try {
+
+ if (listener != null) {
+
+ try {
+ QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+ //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
+ //newGraph.processor.query(newGraph, request, null, procedure, listener);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
+
+ } else {
+
+// final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
+// procedure, "request");
+
+ BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
+
+ try {
+
+ request.perform(newGraph, wrap);
+ wrap.get();
+
+ } catch (DatabaseException e) {
+
+ Logger.defaultLogError(e);
+
+ }
+
+ }
+
+ } finally {
+
+ fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+ }
+
+ }
+
+ });
+
+ }
+
+ public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+ assert (request != null);
+ assert (procedure != null);
+
+ int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+ int sync = notify != null ? thread : -1;
+
+ requestManager.scheduleRead(new SessionRead(null, notify) {
@Override
public void run(int thread) {
fireSessionVariableChange(SessionVariables.QUEUED_READS);
+ ListenerBase listener = getListenerBase(procedure);
+
final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
try {
} else {
- final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
- procedure, "request");
+ final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
try {
-// newGraph.state.barrier.inc();
-
request.perform(newGraph, wrapper);
-// newGraph.waitAsync(request);
-
} catch (Throwable t) {
- wrapper.exception(newGraph, t);
-// newGraph.waitAsync(request);
-
+ t.printStackTrace();
}
});
}
-
+
public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
assert (request != null);
int sync = notify != null ? thread : -1;
- requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
+ requestManager.scheduleRead(new SessionRead(null, notify) {
@Override
public void run(int thread) {
int thread = request.hashCode() & queryProvider2.THREAD_MASK;
- requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+ requestManager.scheduleRead(new SessionRead(throwable, notify) {
@Override
public void run(int thread) {
if (listener != null) {
- newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
-
- @Override
- public void exception(Throwable t) {
- procedure.exception(t);
- if(throwable != null) {
- throwable.set(t);
- }
- }
-
- @Override
- public void execute(T t) {
- if(result != null) result.set(t);
- procedure.execute(t);
- }
-
- }, listener);
-
-// newGraph.waitAsync(request);
+ try {
+ QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+ } catch (DatabaseException e) {
+ Logger.defaultLogError(e);
+ }
} else {
final ArrayList<T> result = new ArrayList<T>();
final DataContainer<Throwable> exception = new DataContainer<Throwable>();
- syncRequest(request, new AsyncMultiProcedure<T>() {
+ syncRequest(request, new SyncMultiProcedure<T>() {
@Override
- public void execute(AsyncReadGraph graph, T t) {
+ public void execute(ReadGraph graph, T t) {
synchronized(result) {
result.add(t);
}
}
@Override
- public void finished(AsyncReadGraph graph) {
+ public void finished(ReadGraph graph) {
}
@Override
- public void exception(AsyncReadGraph graph, Throwable t) {
+ public void exception(ReadGraph graph, Throwable t) {
exception.set(t);
}
}
@Override
- public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
+ public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
assertNotSession();
- return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
+ throw new Error("Not implemented!");
}
@Override
public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
assertNotSession();
- return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+ return syncRequest(request, (SyncMultiProcedure<T>)procedure);
}
@Override
public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
assertNotSession();
- return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
- }
-
- @Override
- public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
- assertNotSession();
- return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+ return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
}
@Override
public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
assertNotSession();
- return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+ return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
}
@Override
assert(request != null);
- asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+ asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
@Override
- public void exception(AsyncReadGraph graph, Throwable t) {
+ public void exception(ReadGraph graph, Throwable t) {
t.printStackTrace();
}
});
}
- @Override
- public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
- asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
- }
-
@Override
public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
- asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+ asyncRequest(request, (SyncMultiProcedure<T>)procedure);
}
@Override
public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
- asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+ asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
}
@Override
public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
- asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+ scheduleRequest(request, procedure, null);
}
@Override
public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
- asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+ asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
}
@Override
asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
}
- @Override
- public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
- assertNotSession();
- throw new Error("Not implemented!");
- }
-
- @Override
- public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
- throw new Error("Not implemented!");
- }
-
@Override
final public <T> void asyncRequest(final ExternalRead<T> request) {