package fi.vtt.simantics.procore.internal; import gnu.trove.set.hash.TIntHashSet; import java.io.ByteArrayInputStream; import java.io.InputStream; import java.util.Collection; import org.simantics.db.Resource; import org.simantics.db.Session; import org.simantics.db.Statement; import org.simantics.db.VirtualGraph; import org.simantics.db.WriteGraph; import org.simantics.db.common.StandardStatement; import org.simantics.db.common.request.WriteRequest; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.InvalidResourceReferenceException; import org.simantics.db.exception.ResourceNotFoundException; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterSupport; import org.simantics.db.impl.ForEachObjectContextProcedure; import org.simantics.db.impl.ForEachObjectProcedure; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.TransientGraph; import org.simantics.db.impl.VirtualGraphImpl; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.query.IntProcedure; import org.simantics.db.impl.query.QueryProcessor; import org.simantics.db.impl.query.QuerySupport; import org.simantics.db.impl.support.BuiltinSupport; import org.simantics.db.impl.support.ResourceSupport; import org.simantics.db.procore.cluster.ClusterImpl; import org.simantics.db.procore.cluster.ClusterSmall; import org.simantics.db.service.SerialisationSupport; import org.simantics.utils.DataContainer; import org.simantics.utils.datastructures.Callback; public class QuerySupportImpl implements QuerySupport { final SessionImplSocket session; final State state; final ClusterTable clusterTable; final BuiltinSupport builtinSupport; final ClusterSupport clusterSupport; final ResourceSupport resourceSupport; final SerialisationSupport serializationSupport; final VirtualGraphServerSupportImpl virtualGraphServerSupport; final GraphSession graphSession; final SessionRequestManager syncThreads; final int root; private boolean pendingPrimitives = false; QuerySupportImpl(SessionImplSocket session, ClusterSupport clusterSupport, SerialisationSupport serializationSupport, SessionRequestManager syncThreads) { this.session = session; this.state = session.state; this.clusterTable = session.clusterTable; this.resourceSupport = session.resourceSupport; this.virtualGraphServerSupport = session.virtualGraphServerSupport; this.graphSession = session.graphSession; this.builtinSupport = session.builtinSupport; this.clusterSupport = clusterSupport; this.serializationSupport = serializationSupport; this.syncThreads = syncThreads; this.root = getBuiltin("http:/"); assert(this.session != null); assert(this.state != null); assert(this.clusterTable != null); assert(this.resourceSupport != null); assert(this.virtualGraphServerSupport != null); assert(this.graphSession != null); assert(this.builtinSupport != null); assert(this.clusterSupport != null); assert(this.serializationSupport != null); assert(this.syncThreads != null); } @Override public ResourceSupport getSupport() { return resourceSupport; } @Override public Statement getStatement(int s, int p, int o) { return getStatement(null, s, p, o); } @Override public Statement getStatement(ReadGraphImpl graph, int s, int p, int o) { Resource sr = getResource(s); Resource pr = getResource(p); Resource or = getResource(o); return new StandardStatement(sr, pr, or); } @Override public Session getSession() { return session; } @Override public long getClusterId(int id) { ClusterI cluster = clusterTable.getClusterByResourceKey(id); if(cluster == null) return 0; return cluster.getClusterId(); } @Override public boolean isImmutable(int id) { // Virtuals are mutable if(id < 0) return false; // Root library is mutable if(root == id) return false; // Anything goes in service mode if(session.serviceMode > 0) return false; return clusterTable.isImmutable(id); } @Override public int getId(Resource resource) { if (resource instanceof ResourceImpl) return ((ResourceImpl)resource).id; return 0; } @Override public Resource getResource(int id) { try { return serializationSupport.getResource(id); } catch (DatabaseException e) { e.printStackTrace(); } return null; } @Override public boolean resume(ReadGraphImpl graph) { return syncThreads.session.queryProvider2.resume(graph); } // @Override // final public void sync(int resumeThread, final SessionRunnable runnable) { // // syncThreads.session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(resumeThread) { // // @Override // public void run(int thread) { // runnable.run(thread); // } // // }); // // } // @Override // final public int nextSyncThread() { // throw new Error(); //// return syncThreads.nextThread(); // } @Override public void dirtyPrimitives() { session.dirtyPrimitives = true; if(state.getWriteCount() == 0 && !pendingPrimitives) { pendingPrimitives = true; session.asyncRequest(new WriteRequest() { @Override public void perform(WriteGraph graph) throws DatabaseException { pendingPrimitives = false; } }); } } @Override @Deprecated final public void aboutToRead() { } // @Override // public void increaseReferenceCount(int callerThread, int subject) { // if (subject < 0) // return; // ClusterI proxy = clusterTable.getClusterByResourceKey(subject); // if (null == proxy) // return; // proxy.increaseReferenceCount(callerThread, 1); // } // // @Override // public void decreaseReferenceCount(int callerThread, int subject) { // if (subject < 0) // return; // ClusterProxy proxy = clusterTable.getClusterByResourceKey(subject); // if (null == proxy) // return; // proxy.decreaseReferenceCount(callerThread, 1); // } // @Override public void getObjects4(final ReadGraphImpl graph, final int subject, final ForEachObjectProcedure procedure) { if(subject < 0) { for(TransientGraph g : virtualGraphServerSupport.providers) { for (final int id : g.getObjects(subject, procedure.predicateKey)) { // int suggestSchedule = graph.processor.processor.resourceThread(id); // if(graph.callerThread == suggestSchedule) { procedure.execute(graph, new ResourceImpl(resourceSupport, id)); // } else { // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) { // // @Override // public void run(int thread) { // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id)); // } // // }); // } } } procedure.finished(graph); // graph.dec(); return; } final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject); if(!cluster.isLoaded()) { cluster.load(session.clusterTranslator, new Runnable() { @Override public void run() { getObjects4(graph, subject, procedure); } }); return; } if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) { for(TransientGraph g : virtualGraphServerSupport.providers) { for (final int id : g.getObjects(subject, procedure.predicateKey)) { // int suggestSchedule = graph.processor.processor.resourceThread(id); // if(graph.callerThread == suggestSchedule) { procedure.execute(graph, new ResourceImpl(resourceSupport, id)); // } else { // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) { // // @Override // public void run(int thread) { // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id)); // } // // }); // } } } try { cluster.forObjects(graph, subject, procedure); } catch (DatabaseException e) { e.printStackTrace(); } } else { try { cluster.forObjects(graph, subject, procedure); } catch (DatabaseException e) { e.printStackTrace(); } } } public void getObjects4(final ReadGraphImpl graph, final int subject, final C context, final ForEachObjectContextProcedure procedure) { if(subject < 0) { for(TransientGraph g : virtualGraphServerSupport.providers) { for (final int id : g.getObjects(subject, procedure.predicateKey)) { // int suggestSchedule = graph.processor.processor.resourceThread(id); // if(graph.callerThread == suggestSchedule) { procedure.execute(graph, context, new ResourceImpl(resourceSupport, id)); // } else { // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) { // // @Override // public void run(int thread) { // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id)); // } // // }); // } } } procedure.finished(graph); // graph.dec(); return; } final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject); if(!cluster.isLoaded()) { cluster.load(session.clusterTranslator, new Runnable() { @Override public void run() { getObjects4(graph, subject, context, procedure); } }); return; } if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) { for(TransientGraph g : virtualGraphServerSupport.providers) { for (final int id : g.getObjects(subject, procedure.predicateKey)) { // int suggestSchedule = graph.processor.processor.resourceThread(id); // if(graph.callerThread == suggestSchedule) { procedure.execute(graph, context, new ResourceImpl(resourceSupport, id)); // } else { // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) { // // @Override // public void run(int thread) { // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id)); // } // // }); // } } } try { cluster.forObjects(graph, subject, context, procedure); } catch (DatabaseException e) { e.printStackTrace(); } } else { try { cluster.forObjects(graph, subject, context, procedure); } catch (DatabaseException e) { e.printStackTrace(); } } } @Override public int getSingleInstance(final int subject) { // Do not process this information for virtual resources if(subject < 0) return 0; final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); if (cluster == null) System.out.println("null cluster: " + Integer.toString(subject, 16)); assert (cluster != null); try { ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport); if(ClusterI.CompleteTypeEnum.InstanceOf == type) { int result = cluster.getCompleteObjectKey(subject, clusterSupport); assert(result > 0); return result; } else { return 0; } } catch (DatabaseException e) { Logger.defaultLogError(e); return 0; } // This happens is the resource is bogus catch (Throwable t) { analyseProblem(cluster); Logger.defaultLogError(t); return 0; } } @Override public int getSingleSuperrelation(final int subject) { // Do not process this information for virtual resources if(subject < 0) return 0; final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); if (cluster == null) System.out.println("null cluster: " + Integer.toString(subject, 16)); assert (cluster != null); try { ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport); if(ClusterI.CompleteTypeEnum.SubrelationOf == type) { int result = cluster.getCompleteObjectKey(subject, clusterSupport); assert(result > 0); return result; } else { return 0; } } catch (DatabaseException e) { Logger.defaultLogError(e); return 0; } } // @Override // public void getSingleSuperrelation(ReadGraphImpl graph, final int subject, final AsyncProcedure procedure) { // // // Do not process this information for virtual resources // if(subject < 0) { // procedure.execute(graph, null); // graph.state.barrier.dec(); // return; // } // // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); // if (cluster == null) // System.out.println("null cluster: " + Integer.toString(subject, 16)); // // assert (cluster != null); // // if(!cluster.isLoaded()) { // // procedure.execute(graph, null); // graph.state.barrier.dec(); // return; // //// queryProvider2.requestCluster(callerThread, cluster.getClusterId(), new Callback() { //// //// @Override //// public void run(Integer i) { //// //// queryProvider2.schedule(i, callerThread, new Runnable() { //// //// @Override //// public void run() { //// //// try { //// //// ClusterI.CompleteTypeEnum type = cluster.getCompleteType(callerThread, subject, SessionImplSocket.this); //// if(ClusterI.CompleteTypeEnum.SubrelationOf == type) { //// int result = cluster.getCompleteObjectKey(callerThread, subject, SessionImplSocket.this); //// assert(result > 0); //// procedure.execute(graph, getResourceByKey(result)); //// } else { //// procedure.execute(graph, null); //// } //// graph.state.barrier.dec(); //// //// } catch (DatabaseException e) { //// e.printStackTrace(); //// } //// //// } //// //// }); //// //// } //// //// }); // // } else { // // try { // // ClusterI.CompleteTypeEnum type = cluster.getCompleteType(graph.callerThread, subject, clusterSupport); // if(ClusterI.CompleteTypeEnum.SubrelationOf == type) { // int result = cluster.getCompleteObjectKey(graph.callerThread, subject, clusterSupport); // assert(result > 0); // procedure.execute(graph, new ResourceImpl(resourceSupport, result)); // } else { // procedure.execute(graph, null); // } // graph.state.barrier.dec(); // // } catch (DatabaseException e) { // e.printStackTrace(); // } // // } // // // } // @Override // public void getObjects2(final int callerThread, final int subject, final int predicate, final IntProcedure procedure) { // ensureLoaded(callerThread, subject, predicate, new Runnable() { // // @Override // public void run() { // safeGetObjects2(callerThread, subject, predicate, procedure); // } // // }); // } // public void safeGetObjects2(final ReadGraphImpl graph, final int subject, final int predicate, final IntProcedure procedure) { // // assert (subject != 0); // assert (predicate != 0); //// System.out.println("getObjects2: s=" + subject + "p=" + predicate); // Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); // if (providers != null) { // // final TIntHashSet result = new TIntHashSet(16); // // for (VirtualGraph provider : providers) { // // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) { // // if (result.add(id)) { // procedure.execute(graph, id); // } // // } // // } // // if (subject < 0) // return; // // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); // // assert (testCluster(subject, cluster)); // // // wheels within wheels // final ClusterI.ObjectProcedure proc = new ClusterI.ObjectProcedure() { // // @Override // public boolean execute(int callerThread, Object context, int object) { // // if (result.add(object)) { // procedure.execute(graph.newAsync(callerThread), object); // } // // return false; // continue looping // // } // // @Override // public boolean found() { // throw new UnsupportedOperationException(); // } // // }; // // try { // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport); // } catch (DatabaseException e) { // Logger.defaultLogError(e); // } catch (Throwable t) { // Logger.defaultLogError(t); // t.printStackTrace(); // } // return; // // } // // assert(subject > 0); // // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); // // assert (testCluster(subject, cluster)); // // try { // cluster.forObjects(graph.callerThread, subject, predicate, new Wheels(procedure), null, clusterSupport); // } catch (DatabaseException e) { // Logger.defaultLogError(e); // } catch (Throwable t) { // t.printStackTrace(); // Logger.defaultLogError(t); // } // // } @Override public boolean getObjects(final ReadGraphImpl graph, final int subject, final int predicate, final IntProcedure procedure) { assert (subject != 0); assert (predicate != 0); if(subject < 0) { boolean found = false; for(TransientGraph g : virtualGraphServerSupport.providers) { for (final int id : g.getObjects(subject, predicate)) { found = true; procedure.execute(graph, id); } } return found; } final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); assert(cluster.isLoaded()); if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) { final DataContainer found = new DataContainer(Boolean.FALSE); final TIntHashSet result = new TIntHashSet(5); for(TransientGraph g : virtualGraphServerSupport.providers) { for (final int id : g.getObjects(subject, predicate)) { found.set(true); if (result.add(id)) { procedure.execute(graph, id); } } } // Virtual predicates are not found from persistent clusters if(predicate < 0) return found.get(); // wheels within wheels final ClusterI.ObjectProcedure proc = new ClusterI.ObjectProcedure() { @Override public boolean execute(Object context, int object) { found.set(true); if (result.add(object)) { procedure.execute(graph, object); } return false; // continue looping } }; try { cluster.forObjects(subject, predicate, proc, null, clusterSupport); } catch (DatabaseException e) { e.printStackTrace(); } return found.get(); } else { // Virtual predicates are not found from persistent clusters if(predicate < 0) return false; class A implements ClusterI.ObjectProcedure { boolean found = false; @Override public boolean execute(Object context, int object) { found = true; procedure.execute(graph, object); return false; // continue looping } public boolean found() { return found; } } // wheels within wheels final A proc = new A(); try { cluster.forObjects(subject, predicate, proc, null, clusterSupport); } catch (DatabaseException e) { e.printStackTrace(); } // This happens if resource is bogus catch (Throwable t) { analyseProblem(cluster); Logger.defaultLogError(t); } return proc.found(); } } // @Override // public boolean getFunctionalObject(final ReadGraphImpl graph, final int subject, final int predicate, // final IntProcedure procedure) { // // assert (subject != 0); // assert (predicate != 0); // // if(subject < 0) { // // boolean found = false; // // Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); // if (providers != null) { // // for (VirtualGraph provider : providers) { // // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) { // found = true; // procedure.execute(graph, id); // } // // } // // } // // return found; // // } // // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); // // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) { // // final DataContainer found = new DataContainer(false); // final TIntHashSet result = new TIntHashSet(5); // // Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); // if (providers != null) { // // for (VirtualGraph provider : providers) { // // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) { // found.set(true); // procedure.execute(graph, id); // } // // } // // } // // // wheels within wheels // final ClusterI.ObjectProcedure proc = new ClusterI.ObjectProcedure() { // // @Override // public boolean execute(int callerThread, Object context, int object) { // // found.set(true); // System.out.println("-found object " + object); // if (result.add(object)) { // procedure.execute(graph.newAsync(callerThread), object); // } // // return false; // continue looping // // } // // @Override // public boolean found() { // throw new UnsupportedOperationException(); // } // // }; // // try { // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport); // } catch (DatabaseException e) { // Logger.defaultLogError(e); // return false; // } catch (Throwable t) { // Logger.defaultLogError(t); // t.printStackTrace(); // } // // return found.get(); // // } else { // // // wheels within wheels // final ClusterI.ObjectProcedure proc = new ClusterI.ObjectProcedure() { // // boolean found = false; // // @Override // public boolean execute(int callerThread, Object context, int object) { // // found = true; // procedure.execute(graph.newAsync(callerThread), object); // return false; // continue looping // // } // // @Override // public boolean found() { // return found; // } // // @Override // public String toString() { // return "Wheels for " + procedure; // } // // }; // // try { // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport); // } catch (DatabaseException e) { // Logger.defaultLogError(e); // return false; // } catch (Throwable t) { // t.printStackTrace(); // Logger.defaultLogError(t); // return false; // } // // return proc.found(); // // } // // } @Override public int getFunctionalObject(final int subject, final int predicate) { assert (subject != 0); assert (predicate != 0); if(subject < 0) { int found = 0; Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); if (providers != null) { for (VirtualGraph provider : providers) { for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) { if(found == 0) found = id; else found = -1; } } } return found; // if(found == -1) return 0; // else return found; } final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) { int result = 0; Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); if (providers != null) { for (VirtualGraph provider : providers) { for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) { if(result == 0) result = id; else result = -1; } } } if(result != 0) return result; try { return cluster.getSingleObject(subject, predicate, clusterSupport); } catch (DatabaseException e) { return -1; } } else { try { return cluster.getSingleObject(subject, predicate, clusterSupport); } catch (DatabaseException e) { return -1; } // This comes if the resource is bogus catch (Throwable t) { analyseProblem(cluster); Logger.defaultLogError(t); return -1; } } } // @Override // public boolean getStatements(final ReadGraphImpl graph, final int subject, final int predicate, // final TripleIntProcedure procedure, final Statements entry) { // // assert (subject != 0); // assert (predicate != 0); // // final TIntHashSet result = new TIntHashSet(16); // final DataContainer found = new DataContainer(false); // // Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); // // if (providers != null) { // // for (VirtualGraph provider : providers) { // // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) { // // found.set(true); // // if (result.add(id)) { // if (null != entry) { // entry.addOrSet(subject, predicate, id); // procedure.execute(graph, subject, predicate, id); // return true; // break; // } else { // procedure.execute(graph, subject, predicate, id); // } // } // // } // // } // // } // // if (subject < 0) // return found.get(); // // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); // assert (cluster != null); // // // wheels within wheels // final ClusterI.ObjectProcedure proc = new ClusterI.ObjectProcedure() { // @Override // public boolean execute(int callerThread, Object context, int object) { // // found.set(true); // // if (result.add(object)) { // if (null != entry) { // entry.addOrSet(subject, predicate, object); // procedure.execute(graph.newAsync(callerThread), subject, predicate, object); // return true; // break; // } else { // procedure.execute(graph.newAsync(callerThread), subject, predicate, object); // } // } // // return false; // continue looping // // } // // @Override // public boolean found() { // throw new UnsupportedOperationException(); // } // // }; // // try { // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport); // } catch (DatabaseException e) { // Logger.defaultLogError(e); // } // return found.get(); // // } @Override public org.simantics.db.DirectStatements getStatements(final ReadGraphImpl graph, final int subject, final QueryProcessor processor, boolean ignoreVirtual) { assert (subject != 0); final DirectStatementsImpl result = new DirectStatementsImpl(resourceSupport, subject); if (!ignoreVirtual) { Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); if (providers != null) { for (TransientGraph provider : providers) { for (int p : provider.getPredicates(subject)) { for (int o : provider.getObjects(subject, p)) { result.addStatement(p, o); } } } } } if (subject < 0) { return result; } else { final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); assert (cluster != null); doGetStatements(graph, cluster, subject, result); } return result; } // @Override // public void getStatements(ReadGraphImpl graph, final int subject, final Procedure procedure) { // // procedure.exception(new DatabaseException("Not supported")); // // } @Override public void getPredicates(final ReadGraphImpl graph, final int subject, final IntProcedure procedure) { final TIntHashSet result = new TIntHashSet(16); Collection providers = virtualGraphServerSupport.getVirtualGraphs(subject); if (providers != null) { for (VirtualGraph provider : providers) { for (int id : ((VirtualGraphImpl)provider).getPredicates(subject)) { if (result.add(id)) { procedure.execute(graph, id); } } } } if (subject < 0) { procedure.finished(graph); return; } ClusterI proxy = clusterTable.getClusterByResourceKey(subject); // if(!proxy.isLoaded()) { // // proxy.load(callerThread, session, new Runnable() { // // @Override // public void run() { // getPredicates(callerThread, subject, procedure); // } // // }); // return; // // } assert (proxy != null); final DataContainer got = new DataContainer(0); ClusterI.PredicateProcedure proc = new ClusterI.PredicateProcedure() { @Override public boolean execute(Object context, int predicate, int oi) { if (result.add(predicate)) { procedure.execute(graph, predicate); } got.set(got.get() + 1); return false; // continue looping } }; try { proxy.forPredicates(subject, proc, null, clusterSupport); } catch (DatabaseException e) { Logger.defaultLogError(e); } procedure.finished(graph); } // @Override // public void getValue(ReadGraphImpl graph, int resource, InternalProcedure procedure) { // // if(resource < 0) { // // Collection providers = virtualGraphServerSupport.getVirtualGraphs(resource); // // if (providers != null) { // // for (VirtualGraph provider : providers) { // // Object value = ((VirtualGraphImpl)provider).getValue(resource); // if (value != null) { // procedure.execute(graph, (byte[])value); // return; // } // // } // // } // // return; // // } // // ClusterI cluster = clusterTable.getClusterByResourceKey(resource); // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) { // // Collection providers = virtualGraphServerSupport.getVirtualGraphs(resource); // // if (providers != null) { // // for (VirtualGraph provider : providers) { // // Object value = ((VirtualGraphImpl)provider).getValue(resource); // if (value != null) { // procedure.execute(graph, (byte[])value); // return; // } // // } // // } // // try { // // byte[] data = cluster.getValue(resource, clusterSupport); // if (null == data || 0 == data.length) { // procedure.execute(graph, null); // } else { // procedure.execute(graph, data); // } // // } catch (DatabaseException e) { // Logger.defaultLogError(e); // } // // } else { // // try { // // byte[] data = cluster.getValue(resource, clusterSupport); // if (null == data || 0 == data.length) { // procedure.execute(graph, null); // } else { // procedure.execute(graph, data); // } // // } catch (DatabaseException e) { // Logger.defaultLogError(e); // } // // } // // } @Override public byte[] getValue(ReadGraphImpl graph, int resource) { if(resource < 0) { Collection providers = virtualGraphServerSupport.getVirtualGraphs(resource); if (providers != null) { for (VirtualGraph provider : providers) { Object value = ((VirtualGraphImpl)provider).getValue(resource); if (value != null) { return (byte[])value; } } } return null; } ClusterI cluster = clusterTable.getClusterByResourceKey(resource); if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) { Collection providers = virtualGraphServerSupport.getVirtualGraphs(resource); if (providers != null) { for (VirtualGraph provider : providers) { Object value = ((VirtualGraphImpl)provider).getValue(resource); if (value != null) { return (byte[])value; } } } try { byte[] data = cluster.getValue(resource, clusterSupport); if (null != data && 0 != data.length) { return data; } } catch (DatabaseException e) { Logger.defaultLogError(e); } return null; } else { try { byte[] data = cluster.getValue(resource, clusterSupport); if (null != data && 0 != data.length) { return data; } } catch (DatabaseException e) { Logger.defaultLogError(e); } return null; } } @Override public InputStream getValueStream(ReadGraphImpl graph, int resource) { if(resource < 0) { Collection providers = virtualGraphServerSupport.getVirtualGraphs(resource); if (providers != null) { for (VirtualGraph provider : providers) { Object value = ((VirtualGraphImpl)provider).getValue(resource); if (value != null) { return new ByteArrayInputStream((byte[])value); } } } return null; } ClusterI cluster = clusterTable.getClusterByResourceKey(resource); if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) { Collection providers = virtualGraphServerSupport.getVirtualGraphs(resource); if (providers != null) { for (VirtualGraph provider : providers) { Object value = ((VirtualGraphImpl)provider).getValue(resource); if (value != null) { return new ByteArrayInputStream((byte[])value); } } } try { return cluster.getValueStream(resource, clusterSupport); } catch (DatabaseException e) { Logger.defaultLogError(e); } return null; } else { try { return cluster.getValueStream(resource, clusterSupport); } catch (DatabaseException e) { Logger.defaultLogError(e); } return null; } } @Override public void requestCluster(ReadGraphImpl graph, final long clusterId, final Runnable r) { class CallbackAdapter implements Callback { final Runnable r; CallbackAdapter(final Runnable r) { this.r = r; } @Override public void run(DatabaseException e) { if (null != e) e.printStackTrace(); else r.run(); } } double p = clusterTable.getLoadProbability(); // System.out.print("Load cluster " + clusterId + " with probability " + p + // " -> "); final ClusterI proxy = clusterSupport.getClusterByClusterId(clusterId); if (!proxy.isLoaded()) { clusterTable.gc(); if (Math.random() < p) { proxy.load(new CallbackAdapter(r)); } else { r.run(); } } else { r.run(); } } @Override public int getBuiltin(String uri) { return builtinSupport.getBuiltin(uri); } @Override public void checkTasks() { System.out.println(syncThreads.toString()); } // @Override // public void asyncWrite(Write request) { // //// if(plainWrite(writer) && sameProvider(request)) { //// writer.writeSupport.pushRequest(request); //// } else { // asyncRequest(request); //// } // // } /* * Helpers * * */ private void doGetStatements(ReadGraphImpl graph, final ClusterI cluster, final int subject, final DirectStatementsImpl result) { final class Proc implements ClusterI.PredicateProcedure { @Override public boolean execute(Object context, final int predicate, final int objectIndex) { doExecute(null, predicate, objectIndex); return false; } private void doExecute(Object context, final int predicate, final int objectIndex) { try { cluster.forObjects(subject, predicate, new ClusterI.ObjectProcedure() { @Override public boolean execute(Object context, int object) { result.addStatement(predicate, object); return false; } }, null, clusterSupport); } catch (DatabaseException e) { e.printStackTrace(); } } } try { cluster.forPredicates(subject, new Proc(), null, clusterSupport); } catch (DatabaseException e) { e.printStackTrace(); } } // private void getDirectObjects4(final int callerThread, final ClusterI cluster, final int subject, final int predicate, final QueryProcessor processor, final ReadGraphImpl graph, final ForEachObjectProcedure procedure) { // // if(!cluster.isLoaded()) { // // requestCluster(callerThread, cluster.getClusterId(), new Callback() { // // @Override // public void run(Integer i) { // // processor.schedule(i, new SessionTask(callerThread) { // // @Override // public void run(int thread) { // // getDirectObjects4(thread, cluster, subject, predicate, processor, graph, procedure); // // } // // }); // // } // // }); // // } else { // // try { // cluster.forObjects(graph, subject, predicate, procedure); // } catch (DatabaseException e) { // e.printStackTrace(); // } // //// procedure.finished(graph); //// graph.state.barrier.dec(); //// //// System.err.println("ai2=" + ai2.decrementAndGet()); // // } // // // } // AtomicInteger ai2 =new AtomicInteger(0); // private boolean testCluster(int subject, ClusterI proxy) { // // if (proxy == null) // System.out.println("null cluster: " + Integer.toString(subject, 16)); // // return proxy != null; // // } long getCluster(int id) { // Virtual resource if(id < 0) return 0; ClusterI proxy = clusterTable.getClusterByResourceKey(id); if(proxy == null) return 0; else return proxy.getClusterId(); } @Override public int getRandomAccessReference(String id) throws ResourceNotFoundException { try { Resource res = serializationSupport.getResourceSerializer().getResource(id); if(res == null) return 0; else return ((ResourceImpl)res).id; } catch (InvalidResourceReferenceException e) { //e.printStackTrace(); } return 0; } @Override public void ensureLoaded(final ReadGraphImpl graph, final int subject, final int predicate) { if(subject < 0) { SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, new Callback() { @Override public void run(ReadGraphImpl parameter) { } }); } else { final ClusterI cluster = clusterTable.checkedGetClusterByResourceKey(subject); if(cluster.isLoaded()) { if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) { SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, new Callback() { @Override public void run(ReadGraphImpl parameter) { } }); } else { } } else { new Exception().printStackTrace(); cluster.load(session.clusterTranslator, new Runnable() { @Override public void run() { if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) { SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, new Callback() { @Override public void run(ReadGraphImpl parameter) { } }); } else { } } }); } } } @Override public void ensureLoaded(final ReadGraphImpl graph, final int subject) { if(subject < 0) { SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, new Callback() { @Override public void run(ReadGraphImpl parameter) { } }); } else { final ClusterI cluster = clusterTable.getClusterByResourceKey(subject); if(cluster.isLoaded()) { if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) { SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, new Callback() { @Override public void run(ReadGraphImpl parameter) { } }); } else { } } else { // System.err.println("cluster not loaded " + subject); new Exception().printStackTrace(); cluster.load(session.clusterTranslator, new Runnable() { @Override public void run() { if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) { SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, new Callback() { @Override public void run(ReadGraphImpl parameter) { } }); } else { } } }); } } } @Override public boolean isLoaded(int subject) { ClusterI cluster = clusterTable.getClusterByResourceKey(subject); return cluster.isLoaded(); } @Override public void ceased(int thread) { session.ceased(thread); } @Override public Object getLock() { return session.requestManager; } @Override public VirtualGraph getProvider(int subject, int predicate, int object) { if(subject > 0) { ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject); // This persistent resource does not have virtual statements => must deny in persistent graph if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null; } for(TransientGraph g : virtualGraphServerSupport.providers) { for (final int id : g.getObjects(subject, predicate)) { if(object == id) return g; } } // Nothing found from virtual graphs return null; } @Override public VirtualGraph getProvider(int subject, int predicate) { if(subject > 0) { ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject); // This persistent resource does not have virtual statements => must deny in persistent graph if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null; } TransientGraph result = null; for(TransientGraph g : virtualGraphServerSupport.providers) { if(g.getObjects(subject, predicate).length > 0) { // Found multiple, return null; if(result != null) return null; else result = g; } } return result; } @Override public VirtualGraph getValueProvider(int subject) { if(subject > 0) { ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject); // This persistent resource does not have virtual statements => must deny in persistent graph if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null; } TransientGraph result = null; for(TransientGraph g : virtualGraphServerSupport.providers) { if(g.getValue(subject) != null) { if(result != null) return null; else result = g; } } return result; } @Override public void exit(Throwable t) { state.close(); } private void analyseProblem(ClusterI cluster) { if(cluster instanceof ClusterSmall) try { ((ClusterSmall)cluster).check(); } catch (DatabaseException e) { e.printStackTrace(); } } }