1 package fi.vtt.simantics.procore.internal;
3 import java.io.ByteArrayInputStream;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.function.Consumer;
8 import org.simantics.db.Resource;
9 import org.simantics.db.Session;
10 import org.simantics.db.Statement;
11 import org.simantics.db.VirtualGraph;
12 import org.simantics.db.WriteGraph;
13 import org.simantics.db.common.StandardStatement;
14 import org.simantics.db.common.request.WriteRequest;
15 import org.simantics.db.common.utils.Logger;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.exception.InvalidResourceReferenceException;
18 import org.simantics.db.exception.ResourceNotFoundException;
19 import org.simantics.db.impl.ClusterI;
20 import org.simantics.db.impl.ClusterSupport;
21 import org.simantics.db.impl.ForEachObjectContextProcedure;
22 import org.simantics.db.impl.ForEachObjectProcedure;
23 import org.simantics.db.impl.ResourceImpl;
24 import org.simantics.db.impl.TransientGraph;
25 import org.simantics.db.impl.VirtualGraphImpl;
26 import org.simantics.db.impl.graph.ReadGraphImpl;
27 import org.simantics.db.impl.query.IntProcedure;
28 import org.simantics.db.impl.query.QueryProcessor;
29 import org.simantics.db.impl.query.QuerySupport;
30 import org.simantics.db.impl.support.BuiltinSupport;
31 import org.simantics.db.impl.support.ResourceSupport;
32 import org.simantics.db.procore.cluster.ClusterImpl;
33 import org.simantics.db.procore.cluster.ClusterSmall;
34 import org.simantics.db.service.SerialisationSupport;
35 import org.simantics.utils.DataContainer;
37 import gnu.trove.set.hash.TIntHashSet;
39 public class QuerySupportImpl implements QuerySupport {
41 final SessionImplSocket session;
43 final ClusterTable clusterTable;
44 final BuiltinSupport builtinSupport;
45 final ClusterSupport clusterSupport;
46 final ResourceSupport resourceSupport;
47 final SerialisationSupport serializationSupport;
48 final VirtualGraphServerSupportImpl virtualGraphServerSupport;
49 final GraphSession graphSession;
50 final SessionRequestManager syncThreads;
53 private boolean pendingPrimitives = false;
55 QuerySupportImpl(SessionImplSocket session, ClusterSupport clusterSupport, SerialisationSupport serializationSupport, SessionRequestManager syncThreads) {
56 this.session = session;
57 this.state = session.state;
58 this.clusterTable = session.clusterTable;
59 this.resourceSupport = session.resourceSupport;
60 this.virtualGraphServerSupport = session.virtualGraphServerSupport;
61 this.graphSession = session.graphSession;
62 this.builtinSupport = session.builtinSupport;
63 this.clusterSupport = clusterSupport;
64 this.serializationSupport = serializationSupport;
65 this.syncThreads = syncThreads;
66 this.root = getBuiltin("http:/");
67 assert(this.session != null);
68 assert(this.state != null);
69 assert(this.clusterTable != null);
70 assert(this.resourceSupport != null);
71 assert(this.virtualGraphServerSupport != null);
72 assert(this.graphSession != null);
73 assert(this.builtinSupport != null);
74 assert(this.clusterSupport != null);
75 assert(this.serializationSupport != null);
76 assert(this.syncThreads != null);
80 public ResourceSupport getSupport() {
81 return resourceSupport;
85 public Statement getStatement(int s, int p, int o) {
86 return getStatement(null, s, p, o);
90 public Statement getStatement(ReadGraphImpl graph, int s, int p, int o) {
91 Resource sr = getResource(s);
92 Resource pr = getResource(p);
93 Resource or = getResource(o);
94 return new StandardStatement(sr, pr, or);
98 public Session getSession() {
103 public long getClusterId(int id) {
104 ClusterI cluster = clusterTable.getClusterByResourceKey(id);
105 if(cluster == null) return 0;
106 return cluster.getClusterId();
110 public boolean isImmutable(int id) {
111 // Virtuals are mutable
112 if(id < 0) return false;
113 // Root library is mutable
114 if(root == id) return false;
115 // Anything goes in service mode
116 if(session.serviceMode > 0) return false;
117 return clusterTable.isImmutable(id);
121 public int getId(Resource resource) {
122 if (resource instanceof ResourceImpl)
123 return ((ResourceImpl)resource).id;
128 public Resource getResource(int id) {
130 return serializationSupport.getResource(id);
131 } catch (DatabaseException e) {
138 public boolean resume(ReadGraphImpl graph) {
140 return syncThreads.session.queryProvider2.resume(graph);
145 // final public void sync(int resumeThread, final SessionRunnable runnable) {
147 // syncThreads.session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(resumeThread) {
150 // public void run(int thread) {
151 // runnable.run(thread);
159 // final public int nextSyncThread() {
160 // throw new Error();
161 //// return syncThreads.nextThread();
165 public void dirtyPrimitives() {
166 session.dirtyPrimitives = true;
167 if(state.getWriteCount() == 0 && !pendingPrimitives) {
168 pendingPrimitives = true;
169 session.asyncRequest(new WriteRequest() {
172 public void perform(WriteGraph graph) throws DatabaseException {
173 pendingPrimitives = false;
182 final public void aboutToRead() {
186 // public void increaseReferenceCount(int callerThread, int subject) {
189 // ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
190 // if (null == proxy)
192 // proxy.increaseReferenceCount(callerThread, 1);
196 // public void decreaseReferenceCount(int callerThread, int subject) {
199 // ClusterProxy proxy = clusterTable.getClusterByResourceKey(subject);
200 // if (null == proxy)
202 // proxy.decreaseReferenceCount(callerThread, 1);
206 public void getObjects4(final ReadGraphImpl graph, final int subject, final ForEachObjectProcedure procedure) {
210 for(TransientGraph g : virtualGraphServerSupport.providers) {
211 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
213 // int suggestSchedule = graph.processor.processor.resourceThread(id);
214 // if(graph.callerThread == suggestSchedule) {
215 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
217 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
220 // public void run(int thread) {
221 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
229 procedure.finished(graph);
235 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
236 if(!cluster.isLoaded()) {
237 cluster.load(session.clusterTranslator, new Runnable() {
241 getObjects4(graph, subject, procedure);
248 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
250 for(TransientGraph g : virtualGraphServerSupport.providers) {
251 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
252 // int suggestSchedule = graph.processor.processor.resourceThread(id);
253 // if(graph.callerThread == suggestSchedule) {
254 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
256 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
259 // public void run(int thread) {
260 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
269 cluster.forObjects(graph, subject, procedure);
270 } catch (DatabaseException e) {
277 cluster.forObjects(graph, subject, procedure);
278 } catch (DatabaseException e) {
286 public <C> void getObjects4(final ReadGraphImpl graph, final int subject, final C context, final ForEachObjectContextProcedure<C> procedure) {
290 for(TransientGraph g : virtualGraphServerSupport.providers) {
291 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
293 // int suggestSchedule = graph.processor.processor.resourceThread(id);
294 // if(graph.callerThread == suggestSchedule) {
295 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
297 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
300 // public void run(int thread) {
301 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
309 procedure.finished(graph, context);
315 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
316 if(!cluster.isLoaded()) {
317 cluster.load(session.clusterTranslator, new Runnable() {
321 getObjects4(graph, subject, context, procedure);
328 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
330 for(TransientGraph g : virtualGraphServerSupport.providers) {
331 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
332 // int suggestSchedule = graph.processor.processor.resourceThread(id);
333 // if(graph.callerThread == suggestSchedule) {
334 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
336 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
339 // public void run(int thread) {
340 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
349 cluster.forObjects(graph, subject, context, procedure);
350 } catch (DatabaseException e) {
357 cluster.forObjects(graph, subject, context, procedure);
358 } catch (DatabaseException e) {
367 public int getSingleInstance(final int subject) {
369 // Do not process this information for virtual resources
370 if(subject < 0) return 0;
372 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
374 System.out.println("null cluster: " + Integer.toString(subject, 16));
375 assert (cluster != null);
379 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
380 if(ClusterI.CompleteTypeEnum.InstanceOf == type) {
381 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
388 } catch (DatabaseException e) {
390 Logger.defaultLogError(e);
394 // This happens is the resource is bogus
395 catch (Throwable t) {
397 analyseProblem(cluster);
399 Logger.defaultLogError(t);
408 public int getSingleSuperrelation(final int subject) {
410 // Do not process this information for virtual resources
411 if(subject < 0) return 0;
413 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
415 System.out.println("null cluster: " + Integer.toString(subject, 16));
416 assert (cluster != null);
420 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
421 if(ClusterI.CompleteTypeEnum.SubrelationOf == type) {
422 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
429 } catch (DatabaseException e) {
431 Logger.defaultLogError(e);
439 public boolean getObjects(final ReadGraphImpl graph, final int subject, final int predicate, final IntProcedure procedure) throws DatabaseException {
441 assert (subject != 0);
442 assert (predicate != 0);
446 boolean found = false;
448 for(TransientGraph g : virtualGraphServerSupport.providers) {
449 for (final int id : g.getObjects(subject, predicate)) {
451 procedure.execute(graph, id);
459 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
460 assert(cluster.isLoaded());
462 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
464 final DataContainer<Boolean> found = new DataContainer<Boolean>(Boolean.FALSE);
465 final TIntHashSet result = new TIntHashSet(5);
467 for(TransientGraph g : virtualGraphServerSupport.providers) {
468 for (final int id : g.getObjects(subject, predicate)) {
471 if (result.add(id)) {
472 procedure.execute(graph, id);
478 // Virtual predicates are not found from persistent clusters
479 if(predicate < 0) return found.get();
481 // wheels within wheels
482 final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
485 public boolean execute(Object context, int object) {
488 if (result.add(object)) {
490 procedure.execute(graph, object);
491 } catch (DatabaseException e) {
492 Logger.defaultLogError(e);
496 return false; // continue looping
503 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
504 } catch (DatabaseException e) {
512 // Virtual predicates are not found from persistent clusters
513 if(predicate < 0) return false;
515 class A implements ClusterI.ObjectProcedure<Object> {
517 boolean found = false;
520 public boolean execute(Object context, int object) {
524 procedure.execute(graph, object);
525 } catch (DatabaseException e) {
526 Logger.defaultLogError(e);
528 return false; // continue looping
532 public boolean found() {
538 // wheels within wheels
539 final A proc = new A();
542 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
543 } catch (DatabaseException e) {
546 // This happens if resource is bogus
547 catch (Throwable t) {
549 analyseProblem(cluster);
551 Logger.defaultLogError(t);
562 // public boolean getFunctionalObject(final ReadGraphImpl graph, final int subject, final int predicate,
563 // final IntProcedure procedure) {
565 // assert (subject != 0);
566 // assert (predicate != 0);
570 // boolean found = false;
572 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
573 // if (providers != null) {
575 // for (VirtualGraph provider : providers) {
577 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
579 // procedure.execute(graph, id);
590 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
592 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
594 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
595 // final TIntHashSet result = new TIntHashSet(5);
597 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
598 // if (providers != null) {
600 // for (VirtualGraph provider : providers) {
602 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
604 // procedure.execute(graph, id);
611 // // wheels within wheels
612 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
615 // public boolean execute(int callerThread, Object context, int object) {
618 // System.out.println("-found object " + object);
619 // if (result.add(object)) {
620 // procedure.execute(graph.newAsync(callerThread), object);
623 // return false; // continue looping
628 // public boolean found() {
629 // throw new UnsupportedOperationException();
635 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
636 // } catch (DatabaseException e) {
637 // Logger.defaultLogError(e);
639 // } catch (Throwable t) {
640 // Logger.defaultLogError(t);
641 // t.printStackTrace();
644 // return found.get();
648 // // wheels within wheels
649 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
651 // boolean found = false;
654 // public boolean execute(int callerThread, Object context, int object) {
657 // procedure.execute(graph.newAsync(callerThread), object);
658 // return false; // continue looping
663 // public boolean found() {
668 // public String toString() {
669 // return "Wheels for " + procedure;
675 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
676 // } catch (DatabaseException e) {
677 // Logger.defaultLogError(e);
679 // } catch (Throwable t) {
680 // t.printStackTrace();
681 // Logger.defaultLogError(t);
685 // return proc.found();
692 public int getFunctionalObject(final int subject, final int predicate) {
694 assert (subject != 0);
695 assert (predicate != 0);
701 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
702 if (providers != null) {
704 for (VirtualGraph provider : providers) {
706 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
707 if(found == 0) found = id;
716 // if(found == -1) return 0;
717 // else return found;
721 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
723 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
726 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
727 if (providers != null) {
729 for (VirtualGraph provider : providers) {
731 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
732 if(result == 0) result = id;
740 if(result != 0) return result;
743 return cluster.getSingleObject(subject, predicate, clusterSupport);
744 } catch (DatabaseException e) {
751 return cluster.getSingleObject(subject, predicate, clusterSupport);
752 } catch (DatabaseException e) {
755 // This comes if the resource is bogus
756 catch (Throwable t) {
758 analyseProblem(cluster);
760 Logger.defaultLogError(t);
771 // public boolean getStatements(final ReadGraphImpl graph, final int subject, final int predicate,
772 // final TripleIntProcedure procedure, final Statements entry) {
774 // assert (subject != 0);
775 // assert (predicate != 0);
777 // final TIntHashSet result = new TIntHashSet(16);
778 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
780 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
782 // if (providers != null) {
784 // for (VirtualGraph provider : providers) {
786 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
790 // if (result.add(id)) {
791 // if (null != entry) {
792 // entry.addOrSet(subject, predicate, id);
793 // procedure.execute(graph, subject, predicate, id);
794 // return true; // break;
796 // procedure.execute(graph, subject, predicate, id);
807 // return found.get();
809 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
810 // assert (cluster != null);
812 // // wheels within wheels
813 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
815 // public boolean execute(int callerThread, Object context, int object) {
819 // if (result.add(object)) {
820 // if (null != entry) {
821 // entry.addOrSet(subject, predicate, object);
822 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
823 // return true; // break;
825 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
829 // return false; // continue looping
834 // public boolean found() {
835 // throw new UnsupportedOperationException();
841 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
842 // } catch (DatabaseException e) {
843 // Logger.defaultLogError(e);
845 // return found.get();
850 public org.simantics.db.DirectStatements getStatements(final ReadGraphImpl graph, final int subject, final QueryProcessor processor, boolean ignoreVirtual) {
852 assert (subject != 0);
854 final DirectStatementsImpl result = new DirectStatementsImpl(resourceSupport, subject);
856 if (!ignoreVirtual) {
857 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
858 if (providers != null) {
859 for (TransientGraph provider : providers) {
860 for (int p : provider.getPredicates(subject)) {
861 for (int o : provider.getObjects(subject, p)) {
862 result.addStatement(p, o);
872 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
873 assert (cluster != null);
874 doGetStatements(graph, cluster, subject, result);
882 public void getPredicates(final ReadGraphImpl graph, final int subject, final IntProcedure procedure) throws DatabaseException {
884 final TIntHashSet result = new TIntHashSet(16);
886 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
888 if (providers != null) {
890 for (VirtualGraph provider : providers) {
892 for (int id : ((VirtualGraphImpl)provider).getPredicates(subject)) {
894 if (result.add(id)) {
895 procedure.execute(graph, id);
905 procedure.finished(graph);
909 ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
910 assert (proxy != null);
912 final DataContainer<Integer> got = new DataContainer<Integer>(0);
913 ClusterI.PredicateProcedure<Object> proc = new ClusterI.PredicateProcedure<Object>() {
915 public boolean execute(Object context, int predicate, int oi) {
916 if (result.add(predicate)) {
918 procedure.execute(graph, predicate);
919 } catch (DatabaseException e) {
920 Logger.defaultLogError(e);
923 got.set(got.get() + 1);
924 return false; // continue looping
928 proxy.forPredicates(subject, proc, null, clusterSupport);
929 } catch (DatabaseException e) {
930 Logger.defaultLogError(e);
932 procedure.finished(graph);
939 // public void getValue(ReadGraphImpl graph, int resource, InternalProcedure<byte[]> procedure) {
941 // if(resource < 0) {
943 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
945 // if (providers != null) {
947 // for (VirtualGraph provider : providers) {
949 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
950 // if (value != null) {
951 // procedure.execute(graph, (byte[])value);
963 // ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
964 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
966 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
968 // if (providers != null) {
970 // for (VirtualGraph provider : providers) {
972 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
973 // if (value != null) {
974 // procedure.execute(graph, (byte[])value);
984 // byte[] data = cluster.getValue(resource, clusterSupport);
985 // if (null == data || 0 == data.length) {
986 // procedure.execute(graph, null);
988 // procedure.execute(graph, data);
991 // } catch (DatabaseException e) {
992 // Logger.defaultLogError(e);
999 // byte[] data = cluster.getValue(resource, clusterSupport);
1000 // if (null == data || 0 == data.length) {
1001 // procedure.execute(graph, null);
1003 // procedure.execute(graph, data);
1006 // } catch (DatabaseException e) {
1007 // Logger.defaultLogError(e);
1016 public byte[] getValue(ReadGraphImpl graph, int resource) {
1020 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1022 if (providers != null) {
1024 for (VirtualGraph provider : providers) {
1026 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1027 if (value != null) {
1028 return (byte[])value;
1039 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1040 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1042 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1044 if (providers != null) {
1046 for (VirtualGraph provider : providers) {
1048 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1049 if (value != null) {
1050 return (byte[])value;
1059 byte[] data = cluster.getValue(resource, clusterSupport);
1060 if (null != data && 0 != data.length) {
1064 } catch (DatabaseException e) {
1065 Logger.defaultLogError(e);
1074 byte[] data = cluster.getValue(resource, clusterSupport);
1075 if (null != data && 0 != data.length) {
1079 } catch (DatabaseException e) {
1080 Logger.defaultLogError(e);
1090 public InputStream getValueStream(ReadGraphImpl graph, int resource) {
1094 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1096 if (providers != null) {
1098 for (VirtualGraph provider : providers) {
1100 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1101 if (value != null) {
1102 return new ByteArrayInputStream((byte[])value);
1113 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1114 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1116 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1118 if (providers != null) {
1120 for (VirtualGraph provider : providers) {
1122 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1123 if (value != null) {
1124 return new ByteArrayInputStream((byte[])value);
1133 return cluster.getValueStream(resource, clusterSupport);
1135 } catch (DatabaseException e) {
1136 Logger.defaultLogError(e);
1145 return cluster.getValueStream(resource, clusterSupport);
1147 } catch (DatabaseException e) {
1148 Logger.defaultLogError(e);
1158 public void requestCluster(ReadGraphImpl graph, final long clusterId, final Runnable r) {
1160 class CallbackAdapter implements Consumer<DatabaseException> {
1162 CallbackAdapter(final Runnable r) {
1166 public void accept(DatabaseException e) {
1168 e.printStackTrace();
1175 double p = clusterTable.getLoadProbability();
1176 // System.out.print("Load cluster " + clusterId + " with probability " + p +
1178 final ClusterI proxy = clusterSupport.getClusterByClusterId(clusterId);
1179 if (!proxy.isLoaded()) {
1181 if (Math.random() < p) {
1182 proxy.load(new CallbackAdapter(r));
1193 public int getBuiltin(String uri) {
1194 return builtinSupport.getBuiltin(uri);
1198 public void checkTasks() {
1199 System.out.println(syncThreads.toString());
1203 // public void asyncWrite(Write request) {
1205 //// if(plainWrite(writer) && sameProvider(request)) {
1206 //// writer.writeSupport.pushRequest(request);
1208 // asyncRequest(request);
1219 private void doGetStatements(ReadGraphImpl graph, final ClusterI cluster, final int subject, final DirectStatementsImpl result) {
1221 final class Proc implements ClusterI.PredicateProcedure<Object> {
1224 public boolean execute(Object context, final int predicate, final int objectIndex) {
1226 doExecute(null, predicate, objectIndex);
1231 private void doExecute(Object context, final int predicate, final int objectIndex) {
1234 cluster.forObjects(subject, predicate, new ClusterI.ObjectProcedure<Object>() {
1237 public boolean execute(Object context, int object) {
1238 result.addStatement(predicate, object);
1242 }, null, clusterSupport);
1243 } catch (DatabaseException e) {
1244 e.printStackTrace();
1252 cluster.forPredicates(subject, new Proc(), null, clusterSupport);
1253 } catch (DatabaseException e) {
1254 e.printStackTrace();
1259 // private void getDirectObjects4(final int callerThread, final ClusterI cluster, final int subject, final int predicate, final QueryProcessor processor, final ReadGraphImpl graph, final ForEachObjectProcedure procedure) {
1261 // if(!cluster.isLoaded()) {
1263 // requestCluster(callerThread, cluster.getClusterId(), new Callback<Integer>() {
1266 // public void run(Integer i) {
1268 // processor.schedule(i, new SessionTask(callerThread) {
1271 // public void run(int thread) {
1273 // getDirectObjects4(thread, cluster, subject, predicate, processor, graph, procedure);
1286 // cluster.forObjects(graph, subject, predicate, procedure);
1287 // } catch (DatabaseException e) {
1288 // e.printStackTrace();
1291 //// procedure.finished(graph);
1292 //// graph.state.barrier.dec();
1294 //// System.err.println("ai2=" + ai2.decrementAndGet());
1301 // AtomicInteger ai2 =new AtomicInteger(0);
1303 // private boolean testCluster(int subject, ClusterI proxy) {
1305 // if (proxy == null)
1306 // System.out.println("null cluster: " + Integer.toString(subject, 16));
1308 // return proxy != null;
1312 long getCluster(int id) {
1314 if(id < 0) return 0;
1315 ClusterI proxy = clusterTable.getClusterByResourceKey(id);
1316 if(proxy == null) return 0;
1317 else return proxy.getClusterId();
1321 public int getRandomAccessReference(String id) throws ResourceNotFoundException {
1324 Resource res = serializationSupport.getResourceSerializer().getResource(id);
1325 if(res == null) return 0;
1326 else return ((ResourceImpl)res).id;
1327 } catch (InvalidResourceReferenceException e) {
1328 //e.printStackTrace();
1335 public void ensureLoaded(final ReadGraphImpl graph, final int subject, final int predicate) {
1339 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1343 final ClusterI cluster = clusterTable.checkedGetClusterByResourceKey(subject);
1345 if(cluster.isLoaded()) {
1347 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1349 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1357 new Exception().printStackTrace();
1359 cluster.load(session.clusterTranslator, new Runnable() {
1364 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1366 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1384 public void ensureLoaded(final ReadGraphImpl graph, final int subject) {
1388 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1392 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1394 if(cluster.isLoaded()) {
1396 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1398 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1406 // System.err.println("cluster not loaded " + subject);
1407 new Exception().printStackTrace();
1409 cluster.load(session.clusterTranslator, new Runnable() {
1414 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1416 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1434 public boolean isLoaded(int subject) {
1435 ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1436 return cluster.isLoaded();
1440 public void ceased(int thread) {
1442 session.ceased(thread);
1447 public Object getLock() {
1449 return session.requestManager;
1454 public VirtualGraph getProvider(int subject, int predicate, int object) {
1457 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1458 // This persistent resource does not have virtual statements => must deny in persistent graph
1459 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1462 for(TransientGraph g : virtualGraphServerSupport.providers) {
1463 for (final int id : g.getObjects(subject, predicate)) {
1464 if(object == id) return g;
1468 // Nothing found from virtual graphs
1474 public VirtualGraph getProvider(int subject, int predicate) {
1477 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1478 // This persistent resource does not have virtual statements => must deny in persistent graph
1479 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1482 TransientGraph result = null;
1483 for(TransientGraph g : virtualGraphServerSupport.providers) {
1484 if(g.getObjects(subject, predicate).length > 0) {
1485 // Found multiple, return null;
1486 if(result != null) return null;
1495 public VirtualGraph getValueProvider(int subject) {
1498 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1499 // This persistent resource does not have virtual statements => must deny in persistent graph
1500 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1503 TransientGraph result = null;
1504 for(TransientGraph g : virtualGraphServerSupport.providers) {
1505 if(g.getValue(subject) != null) {
1506 if(result != null) return null;
1515 public void exit(Throwable t) {
1519 private void analyseProblem(ClusterI cluster) {
1521 if(cluster instanceof ClusterSmall)
1523 ((ClusterSmall)cluster).check();
1524 } catch (DatabaseException e) {
1525 e.printStackTrace();