package fi.vtt.simantics.procore.internal; import java.util.Collection; import org.simantics.db.AsyncReadGraph; import org.simantics.db.ReadGraph; import org.simantics.db.Resource; import org.simantics.db.WriteGraph; import org.simantics.db.common.request.WriteRequest; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.procedure.AsyncContextMultiProcedure; import org.simantics.db.procedure.AsyncMultiProcedure; import org.simantics.db.request.ExternalRead; import org.simantics.db.service.QueryControl; import org.simantics.utils.DataContainer; public class QueryControlImpl implements QueryControl { final private SessionImplSocket session; QueryControlImpl(SessionImplSocket session) { this.session = session; } @Override public int getAmountOfQueryThreads() { return session.getAmountOfQueryThreads(); } @Override public int getGraphThread(AsyncReadGraph graph) { return 0; } @Override public int flush() { final DataContainer result = new DataContainer(); try { session.syncRequest(new WriteRequest() { @Override public void perform(WriteGraph graph) throws DatabaseException { result.set(session.queryProvider2.clean()); } }); } catch (DatabaseException e) { e.printStackTrace(); } return result.get(); } @Override public int flush(ReadGraph graph) { return session.queryProvider2.clean(); } @Override public int count() { return session.queryProvider2.querySize(); } @Override public void gc(ReadGraph graph, int allowedTimeInMs) { // 20% young target session.queryProvider2.gc(20, allowedTimeInMs); } @Override public void gc(final Collection> requests) { try { session.syncRequest(new WriteRequest() { @Override public void perform(WriteGraph graph) throws DatabaseException { gc(graph, requests); } }); } catch (DatabaseException e) { e.printStackTrace(); } } @Override public void gc(WriteGraph graph, final Collection> requests) { if (graph == null) throw new IllegalArgumentException("null WriteGraph"); if (requests == null) throw new IllegalArgumentException("null requests"); session.queryProvider2.clean(requests); } @Override public boolean scheduleByCluster(AsyncReadGraph graph, final Resource resource, final AsyncMultiProcedure procedure) { final ReadGraphImpl impl = (ReadGraphImpl)graph; ResourceImpl res = (ResourceImpl)resource; int targetThread = ((res.id >>> 16) & session.queryProvider2.THREAD_MASK); if(0 == targetThread) return true; //System.err.println("scheduleByCluster[" + res.id + "|" + (res.id>>>16) + "] " + impl.callerThread + " -> " + targetThread); // impl.state.barrier.inc(); // // AsyncReadGraph targetGraph = impl.newAsync(); procedure.execute(impl, resource); // impl.state.barrier.dec(); return false; } @Override public boolean scheduleByCluster(AsyncReadGraph graph, final Resource resource, final C context, final AsyncContextMultiProcedure procedure) { final ReadGraphImpl impl = (ReadGraphImpl)graph; ResourceImpl res = (ResourceImpl)resource; int targetThread = ((res.id >>> 16) & session.queryProvider2.THREAD_MASK); if(0 == targetThread) return true; //System.err.println("scheduleByCluster[" + res.id + "|" + (res.id>>>16) + "] " + impl.callerThread + " -> " + targetThread); // impl.state.barrier.inc(); // // AsyncReadGraph targetGraph = impl.newAsync(); procedure.execute(impl, context, resource); // impl.state.barrier.dec(); return false; } @Override public void schedule(AsyncReadGraph graph, int targetThread, final ControlProcedure procedure) { final ReadGraphImpl impl = (ReadGraphImpl)graph; // impl.state.barrier.inc(); // // AsyncReadGraph targetGraph = impl.newAsync(); procedure.execute(impl); // impl.state.barrier.dec(); } @Override public ReadGraph getIndependentGraph(ReadGraph graph) { ReadGraphImpl impl = (ReadGraphImpl)graph; return impl.withParent(null); } @Override public boolean hasParentRequest(ReadGraph graph) { ReadGraphImpl impl = (ReadGraphImpl)graph; return impl.parent != null; } @Override public boolean resume(AsyncReadGraph graph) { ReadGraphImpl impl = (ReadGraphImpl)graph; return impl.processor.querySupport.resume(impl); } }