1 package fi.vtt.simantics.procore.internal;
3 import java.io.ByteArrayInputStream;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.function.Consumer;
8 import org.simantics.db.Resource;
9 import org.simantics.db.Session;
10 import org.simantics.db.Statement;
11 import org.simantics.db.VirtualGraph;
12 import org.simantics.db.WriteGraph;
13 import org.simantics.db.common.StandardStatement;
14 import org.simantics.db.common.request.WriteRequest;
15 import org.simantics.db.common.utils.Logger;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.exception.InvalidResourceReferenceException;
18 import org.simantics.db.exception.ResourceNotFoundException;
19 import org.simantics.db.impl.ClusterI;
20 import org.simantics.db.impl.ClusterSupport;
21 import org.simantics.db.impl.ForEachObjectContextProcedure;
22 import org.simantics.db.impl.ForEachObjectProcedure;
23 import org.simantics.db.impl.ResourceImpl;
24 import org.simantics.db.impl.TransientGraph;
25 import org.simantics.db.impl.VirtualGraphImpl;
26 import org.simantics.db.impl.graph.ReadGraphImpl;
27 import org.simantics.db.impl.query.IntProcedure;
28 import org.simantics.db.impl.query.QueryProcessor;
29 import org.simantics.db.impl.query.QuerySupport;
30 import org.simantics.db.impl.support.BuiltinSupport;
31 import org.simantics.db.impl.support.ResourceSupport;
32 import org.simantics.db.procore.cluster.ClusterImpl;
33 import org.simantics.db.procore.cluster.ClusterSmall;
34 import org.simantics.db.service.SerialisationSupport;
35 import org.simantics.utils.DataContainer;
36 import org.slf4j.LoggerFactory;
38 import gnu.trove.set.hash.TIntHashSet;
40 public class QuerySupportImpl implements QuerySupport {
42 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QuerySupportImpl.class);
44 final SessionImplSocket session;
46 final ClusterTable clusterTable;
47 final BuiltinSupport builtinSupport;
48 final ClusterSupport clusterSupport;
49 final ResourceSupport resourceSupport;
50 final SerialisationSupport serializationSupport;
51 final VirtualGraphServerSupportImpl virtualGraphServerSupport;
52 final GraphSession graphSession;
53 final SessionRequestManager syncThreads;
56 private boolean pendingPrimitives = false;
58 QuerySupportImpl(SessionImplSocket session, ClusterSupport clusterSupport, SerialisationSupport serializationSupport, SessionRequestManager syncThreads) {
59 this.session = session;
60 this.state = session.state;
61 this.clusterTable = session.clusterTable;
62 this.resourceSupport = session.resourceSupport;
63 this.virtualGraphServerSupport = session.virtualGraphServerSupport;
64 this.graphSession = session.graphSession;
65 this.builtinSupport = session.builtinSupport;
66 this.clusterSupport = clusterSupport;
67 this.serializationSupport = serializationSupport;
68 this.syncThreads = syncThreads;
69 this.root = getBuiltin("http:/");
70 assert(this.session != null);
71 assert(this.state != null);
72 assert(this.clusterTable != null);
73 assert(this.resourceSupport != null);
74 assert(this.virtualGraphServerSupport != null);
75 assert(this.graphSession != null);
76 assert(this.builtinSupport != null);
77 assert(this.clusterSupport != null);
78 assert(this.serializationSupport != null);
79 assert(this.syncThreads != null);
83 public ResourceSupport getSupport() {
84 return resourceSupport;
88 public Statement getStatement(int s, int p, int o) {
89 return getStatement(null, s, p, o);
93 public Statement getStatement(ReadGraphImpl graph, int s, int p, int o) {
94 Resource sr = getResource(s);
95 Resource pr = getResource(p);
96 Resource or = getResource(o);
97 return new StandardStatement(sr, pr, or);
101 public Session getSession() {
106 public long getClusterId(int id) {
107 ClusterI cluster = clusterTable.getClusterByResourceKey(id);
108 if(cluster == null) return 0;
109 return cluster.getClusterId();
113 public boolean isImmutable(int id) {
114 // Virtuals are mutable
115 if(id < 0) return false;
116 // Root library is mutable
117 if(root == id) return false;
118 // Anything goes in service mode
119 if(session.serviceMode > 0) return false;
120 return clusterTable.isImmutable(id);
124 public int getId(Resource resource) {
125 if (resource instanceof ResourceImpl)
126 return ((ResourceImpl)resource).id;
131 public Resource getResource(int id) {
133 return serializationSupport.getResource(id);
134 } catch (DatabaseException e) {
141 public boolean resume(ReadGraphImpl graph) {
143 return syncThreads.session.queryProvider2.resume(graph);
148 // final public void sync(int resumeThread, final SessionRunnable runnable) {
150 // syncThreads.session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(resumeThread) {
153 // public void run(int thread) {
154 // runnable.run(thread);
162 // final public int nextSyncThread() {
163 // throw new Error();
164 //// return syncThreads.nextThread();
168 public void dirtyPrimitives() {
169 session.dirtyPrimitives = true;
170 if(state.getWriteCount() == 0 && !pendingPrimitives) {
171 pendingPrimitives = true;
172 session.asyncRequest(new WriteRequest() {
175 public void perform(WriteGraph graph) throws DatabaseException {
176 pendingPrimitives = false;
185 final public void aboutToRead() {
189 // public void increaseReferenceCount(int callerThread, int subject) {
192 // ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
193 // if (null == proxy)
195 // proxy.increaseReferenceCount(callerThread, 1);
199 // public void decreaseReferenceCount(int callerThread, int subject) {
202 // ClusterProxy proxy = clusterTable.getClusterByResourceKey(subject);
203 // if (null == proxy)
205 // proxy.decreaseReferenceCount(callerThread, 1);
209 public void getObjects4(final ReadGraphImpl graph, final int subject, final ForEachObjectProcedure procedure) {
213 for(TransientGraph g : virtualGraphServerSupport.providers) {
214 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
216 // int suggestSchedule = graph.processor.processor.resourceThread(id);
217 // if(graph.callerThread == suggestSchedule) {
219 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
220 } catch (DatabaseException e) {
221 LOGGER.error("Unexpected exception while handling object", e);
224 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
227 // public void run(int thread) {
228 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
237 procedure.finished(graph);
238 } catch (DatabaseException e) {
239 LOGGER.error("Unexpected exception while handling objects", e);
246 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
247 if(!cluster.isLoaded()) {
250 } catch (DatabaseException e) {
251 LOGGER.error("Unexpected exception while handling objects", e);
253 getObjects4(graph, subject, procedure);
257 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
259 for(TransientGraph g : virtualGraphServerSupport.providers) {
260 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
262 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
263 } catch (DatabaseException e) {
264 LOGGER.error("Unexpected exception while handling objects", e);
268 // int suggestSchedule = graph.processor.processor.resourceThread(id);
269 // if(graph.callerThread == suggestSchedule) {
271 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
274 // public void run(int thread) {
275 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
284 cluster.forObjects(graph, subject, procedure);
285 } catch (DatabaseException e) {
292 cluster.forObjects(graph, subject, procedure);
293 } catch (DatabaseException e) {
301 public <C> void getObjects4(final ReadGraphImpl graph, final int subject, final C context, final ForEachObjectContextProcedure<C> procedure) {
305 for(TransientGraph g : virtualGraphServerSupport.providers) {
306 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
308 // int suggestSchedule = graph.processor.processor.resourceThread(id);
309 // if(graph.callerThread == suggestSchedule) {
312 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
313 } catch (DatabaseException e) {
314 LOGGER.error("Unexpected exception while handling objects", e);
320 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
323 // public void run(int thread) {
324 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
334 procedure.finished(graph, context);
335 } catch (DatabaseException e) {
336 LOGGER.error("Unexpected exception while handling objects", e);
345 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
346 if(!cluster.isLoaded()) {
347 cluster.load(session.clusterTranslator, new Runnable() {
351 getObjects4(graph, subject, context, procedure);
358 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
360 for(TransientGraph g : virtualGraphServerSupport.providers) {
361 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
362 // int suggestSchedule = graph.processor.processor.resourceThread(id);
363 // if(graph.callerThread == suggestSchedule) {
365 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
366 } catch (DatabaseException e) {
367 LOGGER.error("Unexpected exception while handling objects", e);
370 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
373 // public void run(int thread) {
374 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
383 cluster.forObjects(graph, subject, context, procedure);
384 } catch (DatabaseException e) {
391 cluster.forObjects(graph, subject, context, procedure);
392 } catch (DatabaseException e) {
401 public int getSingleInstance(final int subject) {
403 // Do not process this information for virtual resources
404 if(subject < 0) return 0;
406 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
408 System.out.println("null cluster: " + Integer.toString(subject, 16));
409 assert (cluster != null);
413 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
414 if(ClusterI.CompleteTypeEnum.InstanceOf == type) {
415 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
422 } catch (DatabaseException e) {
424 Logger.defaultLogError(e);
428 // This happens is the resource is bogus
429 catch (Throwable t) {
431 analyseProblem(cluster);
433 Logger.defaultLogError(t);
442 public int getSingleSuperrelation(final int subject) {
444 // Do not process this information for virtual resources
445 if(subject < 0) return 0;
447 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
449 System.out.println("null cluster: " + Integer.toString(subject, 16));
450 assert (cluster != null);
454 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
455 if(ClusterI.CompleteTypeEnum.SubrelationOf == type) {
456 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
463 } catch (DatabaseException e) {
465 Logger.defaultLogError(e);
473 public boolean getObjects(final ReadGraphImpl graph, final int subject, final int predicate, final IntProcedure procedure) throws DatabaseException {
475 assert (subject != 0);
476 assert (predicate != 0);
480 boolean found = false;
482 for(TransientGraph g : virtualGraphServerSupport.providers) {
483 for (final int id : g.getObjects(subject, predicate)) {
485 procedure.execute(graph, id);
493 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
494 assert(cluster.isLoaded());
496 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
498 final DataContainer<Boolean> found = new DataContainer<Boolean>(Boolean.FALSE);
499 final TIntHashSet result = new TIntHashSet(5);
501 for(TransientGraph g : virtualGraphServerSupport.providers) {
502 for (final int id : g.getObjects(subject, predicate)) {
505 if (result.add(id)) {
506 procedure.execute(graph, id);
512 // Virtual predicates are not found from persistent clusters
513 if(predicate < 0) return found.get();
515 // wheels within wheels
516 final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
519 public boolean execute(Object context, int object) {
522 if (result.add(object)) {
524 procedure.execute(graph, object);
525 } catch (DatabaseException e) {
526 Logger.defaultLogError(e);
530 return false; // continue looping
537 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
538 } catch (DatabaseException e) {
546 // Virtual predicates are not found from persistent clusters
547 if(predicate < 0) return false;
549 class A implements ClusterI.ObjectProcedure<Object> {
551 boolean found = false;
554 public boolean execute(Object context, int object) {
558 procedure.execute(graph, object);
559 } catch (DatabaseException e) {
560 Logger.defaultLogError(e);
562 return false; // continue looping
566 public boolean found() {
572 // wheels within wheels
573 final A proc = new A();
576 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
577 } catch (DatabaseException e) {
580 // This happens if resource is bogus
581 catch (Throwable t) {
583 analyseProblem(cluster);
585 Logger.defaultLogError(t);
596 // public boolean getFunctionalObject(final ReadGraphImpl graph, final int subject, final int predicate,
597 // final IntProcedure procedure) {
599 // assert (subject != 0);
600 // assert (predicate != 0);
604 // boolean found = false;
606 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
607 // if (providers != null) {
609 // for (VirtualGraph provider : providers) {
611 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
613 // procedure.execute(graph, id);
624 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
626 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
628 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
629 // final TIntHashSet result = new TIntHashSet(5);
631 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
632 // if (providers != null) {
634 // for (VirtualGraph provider : providers) {
636 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
638 // procedure.execute(graph, id);
645 // // wheels within wheels
646 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
649 // public boolean execute(int callerThread, Object context, int object) {
652 // System.out.println("-found object " + object);
653 // if (result.add(object)) {
654 // procedure.execute(graph.newAsync(callerThread), object);
657 // return false; // continue looping
662 // public boolean found() {
663 // throw new UnsupportedOperationException();
669 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
670 // } catch (DatabaseException e) {
671 // Logger.defaultLogError(e);
673 // } catch (Throwable t) {
674 // Logger.defaultLogError(t);
675 // t.printStackTrace();
678 // return found.get();
682 // // wheels within wheels
683 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
685 // boolean found = false;
688 // public boolean execute(int callerThread, Object context, int object) {
691 // procedure.execute(graph.newAsync(callerThread), object);
692 // return false; // continue looping
697 // public boolean found() {
702 // public String toString() {
703 // return "Wheels for " + procedure;
709 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
710 // } catch (DatabaseException e) {
711 // Logger.defaultLogError(e);
713 // } catch (Throwable t) {
714 // t.printStackTrace();
715 // Logger.defaultLogError(t);
719 // return proc.found();
726 public int getFunctionalObject(final int subject, final int predicate) {
728 assert (subject != 0);
729 assert (predicate != 0);
735 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
736 if (providers != null) {
738 for (VirtualGraph provider : providers) {
740 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
741 if(found == 0) found = id;
750 // if(found == -1) return 0;
751 // else return found;
755 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
757 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
760 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
761 if (providers != null) {
763 for (VirtualGraph provider : providers) {
765 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
766 if(result == 0) result = id;
774 if(result != 0) return result;
777 return cluster.getSingleObject(subject, predicate, clusterSupport);
778 } catch (DatabaseException e) {
785 return cluster.getSingleObject(subject, predicate, clusterSupport);
786 } catch (DatabaseException e) {
789 // This comes if the resource is bogus
790 catch (Throwable t) {
792 analyseProblem(cluster);
794 Logger.defaultLogError(t);
805 // public boolean getStatements(final ReadGraphImpl graph, final int subject, final int predicate,
806 // final TripleIntProcedure procedure, final Statements entry) {
808 // assert (subject != 0);
809 // assert (predicate != 0);
811 // final TIntHashSet result = new TIntHashSet(16);
812 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
814 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
816 // if (providers != null) {
818 // for (VirtualGraph provider : providers) {
820 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
824 // if (result.add(id)) {
825 // if (null != entry) {
826 // entry.addOrSet(subject, predicate, id);
827 // procedure.execute(graph, subject, predicate, id);
828 // return true; // break;
830 // procedure.execute(graph, subject, predicate, id);
841 // return found.get();
843 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
844 // assert (cluster != null);
846 // // wheels within wheels
847 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
849 // public boolean execute(int callerThread, Object context, int object) {
853 // if (result.add(object)) {
854 // if (null != entry) {
855 // entry.addOrSet(subject, predicate, object);
856 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
857 // return true; // break;
859 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
863 // return false; // continue looping
868 // public boolean found() {
869 // throw new UnsupportedOperationException();
875 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
876 // } catch (DatabaseException e) {
877 // Logger.defaultLogError(e);
879 // return found.get();
884 public org.simantics.db.DirectStatements getStatements(final ReadGraphImpl graph, final int subject, final QueryProcessor processor, boolean ignoreVirtual) {
886 assert (subject != 0);
888 final DirectStatementsImpl result = new DirectStatementsImpl(resourceSupport, subject);
890 if (!ignoreVirtual) {
891 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
892 if (providers != null) {
893 for (TransientGraph provider : providers) {
894 for (int p : provider.getPredicates(subject)) {
895 for (int o : provider.getObjects(subject, p)) {
896 result.addStatement(p, o);
906 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
907 assert (cluster != null);
908 doGetStatements(graph, cluster, subject, result);
916 public void getPredicates(final ReadGraphImpl graph, final int subject, final IntProcedure procedure) throws DatabaseException {
918 final TIntHashSet result = new TIntHashSet(16);
920 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
922 if (providers != null) {
924 for (VirtualGraph provider : providers) {
926 for (int id : ((VirtualGraphImpl)provider).getPredicates(subject)) {
928 if (result.add(id)) {
929 procedure.execute(graph, id);
939 procedure.finished(graph);
943 ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
944 assert (proxy != null);
946 final DataContainer<Integer> got = new DataContainer<Integer>(0);
947 ClusterI.PredicateProcedure<Object> proc = new ClusterI.PredicateProcedure<Object>() {
949 public boolean execute(Object context, int predicate, int oi) {
950 if (result.add(predicate)) {
952 procedure.execute(graph, predicate);
953 } catch (DatabaseException e) {
954 Logger.defaultLogError(e);
957 got.set(got.get() + 1);
958 return false; // continue looping
962 proxy.forPredicates(subject, proc, null, clusterSupport);
963 } catch (DatabaseException e) {
964 Logger.defaultLogError(e);
966 procedure.finished(graph);
973 // public void getValue(ReadGraphImpl graph, int resource, InternalProcedure<byte[]> procedure) {
975 // if(resource < 0) {
977 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
979 // if (providers != null) {
981 // for (VirtualGraph provider : providers) {
983 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
984 // if (value != null) {
985 // procedure.execute(graph, (byte[])value);
997 // ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
998 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1000 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1002 // if (providers != null) {
1004 // for (VirtualGraph provider : providers) {
1006 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
1007 // if (value != null) {
1008 // procedure.execute(graph, (byte[])value);
1018 // byte[] data = cluster.getValue(resource, clusterSupport);
1019 // if (null == data || 0 == data.length) {
1020 // procedure.execute(graph, null);
1022 // procedure.execute(graph, data);
1025 // } catch (DatabaseException e) {
1026 // Logger.defaultLogError(e);
1033 // byte[] data = cluster.getValue(resource, clusterSupport);
1034 // if (null == data || 0 == data.length) {
1035 // procedure.execute(graph, null);
1037 // procedure.execute(graph, data);
1040 // } catch (DatabaseException e) {
1041 // Logger.defaultLogError(e);
1050 public byte[] getValue(ReadGraphImpl graph, int resource) {
1054 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1056 if (providers != null) {
1058 for (VirtualGraph provider : providers) {
1060 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1061 if (value != null) {
1062 return (byte[])value;
1073 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1074 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1076 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1078 if (providers != null) {
1080 for (VirtualGraph provider : providers) {
1082 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1083 if (value != null) {
1084 return (byte[])value;
1093 byte[] data = cluster.getValue(resource, clusterSupport);
1094 if (null != data && 0 != data.length) {
1098 } catch (DatabaseException e) {
1099 Logger.defaultLogError(e);
1108 byte[] data = cluster.getValue(resource, clusterSupport);
1109 if (null != data && 0 != data.length) {
1113 } catch (DatabaseException e) {
1114 Logger.defaultLogError(e);
1124 public InputStream getValueStream(ReadGraphImpl graph, int resource) {
1128 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1130 if (providers != null) {
1132 for (VirtualGraph provider : providers) {
1134 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1135 if (value != null) {
1136 return new ByteArrayInputStream((byte[])value);
1147 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1148 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1150 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1152 if (providers != null) {
1154 for (VirtualGraph provider : providers) {
1156 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1157 if (value != null) {
1158 return new ByteArrayInputStream((byte[])value);
1167 return cluster.getValueStream(resource, clusterSupport);
1169 } catch (DatabaseException e) {
1170 Logger.defaultLogError(e);
1179 return cluster.getValueStream(resource, clusterSupport);
1181 } catch (DatabaseException e) {
1182 Logger.defaultLogError(e);
1192 public void requestCluster(ReadGraphImpl graph, final long clusterId, final Runnable r) {
1194 class CallbackAdapter implements Consumer<DatabaseException> {
1196 CallbackAdapter(final Runnable r) {
1200 public void accept(DatabaseException e) {
1202 e.printStackTrace();
1209 double p = clusterTable.getLoadProbability();
1210 // System.out.print("Load cluster " + clusterId + " with probability " + p +
1212 final ClusterI proxy = clusterSupport.getClusterByClusterId(clusterId);
1213 if (!proxy.isLoaded()) {
1215 if (Math.random() < p) {
1216 proxy.load(new CallbackAdapter(r));
1227 public int getBuiltin(String uri) {
1228 return builtinSupport.getBuiltin(uri);
1232 public void checkTasks() {
1233 System.out.println(syncThreads.toString());
1237 // public void asyncWrite(Write request) {
1239 //// if(plainWrite(writer) && sameProvider(request)) {
1240 //// writer.writeSupport.pushRequest(request);
1242 // asyncRequest(request);
1253 private void doGetStatements(ReadGraphImpl graph, final ClusterI cluster, final int subject, final DirectStatementsImpl result) {
1255 final class Proc implements ClusterI.PredicateProcedure<Object> {
1258 public boolean execute(Object context, final int predicate, final int objectIndex) {
1260 doExecute(null, predicate, objectIndex);
1265 private void doExecute(Object context, final int predicate, final int objectIndex) {
1268 cluster.forObjects(subject, predicate, new ClusterI.ObjectProcedure<Object>() {
1271 public boolean execute(Object context, int object) {
1272 result.addStatement(predicate, object);
1276 }, null, clusterSupport);
1277 } catch (DatabaseException e) {
1278 e.printStackTrace();
1286 cluster.forPredicates(subject, new Proc(), null, clusterSupport);
1287 } catch (DatabaseException e) {
1288 e.printStackTrace();
1293 // private void getDirectObjects4(final int callerThread, final ClusterI cluster, final int subject, final int predicate, final QueryProcessor processor, final ReadGraphImpl graph, final ForEachObjectProcedure procedure) {
1295 // if(!cluster.isLoaded()) {
1297 // requestCluster(callerThread, cluster.getClusterId(), new Callback<Integer>() {
1300 // public void run(Integer i) {
1302 // processor.schedule(i, new SessionTask(callerThread) {
1305 // public void run(int thread) {
1307 // getDirectObjects4(thread, cluster, subject, predicate, processor, graph, procedure);
1320 // cluster.forObjects(graph, subject, predicate, procedure);
1321 // } catch (DatabaseException e) {
1322 // e.printStackTrace();
1325 //// procedure.finished(graph);
1326 //// graph.state.barrier.dec();
1328 //// System.err.println("ai2=" + ai2.decrementAndGet());
1335 // AtomicInteger ai2 =new AtomicInteger(0);
1337 // private boolean testCluster(int subject, ClusterI proxy) {
1339 // if (proxy == null)
1340 // System.out.println("null cluster: " + Integer.toString(subject, 16));
1342 // return proxy != null;
1346 long getCluster(int id) {
1348 if(id < 0) return 0;
1349 ClusterI proxy = clusterTable.getClusterByResourceKey(id);
1350 if(proxy == null) return 0;
1351 else return proxy.getClusterId();
1355 public int getRandomAccessReference(String id) throws ResourceNotFoundException {
1358 Resource res = serializationSupport.getResourceSerializer().getResource(id);
1359 if(res == null) return 0;
1360 else return ((ResourceImpl)res).id;
1361 } catch (InvalidResourceReferenceException e) {
1362 //e.printStackTrace();
1369 public void ensureLoaded(final ReadGraphImpl graph, final int subject, final int predicate) {
1373 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1377 final ClusterI cluster = clusterTable.checkedGetClusterByResourceKey(subject);
1379 if(cluster.isLoaded()) {
1381 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1383 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1391 new Exception().printStackTrace();
1393 cluster.load(session.clusterTranslator, new Runnable() {
1398 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1400 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1418 public void ensureLoaded(final ReadGraphImpl graph, final int subject) {
1422 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1426 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1428 if(cluster.isLoaded()) {
1430 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1432 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1440 // System.err.println("cluster not loaded " + subject);
1441 new Exception().printStackTrace();
1443 cluster.load(session.clusterTranslator, new Runnable() {
1448 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1450 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1468 public boolean isLoaded(int subject) {
1469 ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1470 return cluster.isLoaded();
1474 public void ceased(int thread) {
1476 session.ceased(thread);
1481 public Object getLock() {
1483 return session.requestManager;
1488 public VirtualGraph getProvider(int subject, int predicate, int object) {
1491 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1492 // This persistent resource does not have virtual statements => must deny in persistent graph
1493 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1496 for(TransientGraph g : virtualGraphServerSupport.providers) {
1497 for (final int id : g.getObjects(subject, predicate)) {
1498 if(object == id) return g;
1502 // Nothing found from virtual graphs
1508 public VirtualGraph getProvider(int subject, int predicate) {
1511 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1512 // This persistent resource does not have virtual statements => must deny in persistent graph
1513 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1516 TransientGraph result = null;
1517 for(TransientGraph g : virtualGraphServerSupport.providers) {
1518 if(g.getObjects(subject, predicate).length > 0) {
1519 // Found multiple, return null;
1520 if(result != null) return null;
1529 public VirtualGraph getValueProvider(int subject) {
1532 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1533 // This persistent resource does not have virtual statements => must deny in persistent graph
1534 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1537 TransientGraph result = null;
1538 for(TransientGraph g : virtualGraphServerSupport.providers) {
1539 if(g.getValue(subject) != null) {
1540 if(result != null) return null;
1549 public void exit(Throwable t) {
1553 private void analyseProblem(ClusterI cluster) {
1555 if(cluster instanceof ClusterSmall)
1557 ((ClusterSmall)cluster).check();
1558 } catch (DatabaseException e) {
1559 e.printStackTrace();