1 package fi.vtt.simantics.procore.internal;
3 import gnu.trove.set.hash.TIntHashSet;
5 import java.io.ByteArrayInputStream;
6 import java.io.InputStream;
7 import java.util.Collection;
8 import java.util.function.Consumer;
10 import org.simantics.db.Resource;
11 import org.simantics.db.Session;
12 import org.simantics.db.Statement;
13 import org.simantics.db.VirtualGraph;
14 import org.simantics.db.WriteGraph;
15 import org.simantics.db.common.StandardStatement;
16 import org.simantics.db.common.request.WriteRequest;
17 import org.simantics.db.common.utils.Logger;
18 import org.simantics.db.exception.DatabaseException;
19 import org.simantics.db.exception.InvalidResourceReferenceException;
20 import org.simantics.db.exception.ResourceNotFoundException;
21 import org.simantics.db.impl.ClusterI;
22 import org.simantics.db.impl.ClusterSupport;
23 import org.simantics.db.impl.ForEachObjectContextProcedure;
24 import org.simantics.db.impl.ForEachObjectProcedure;
25 import org.simantics.db.impl.ResourceImpl;
26 import org.simantics.db.impl.TransientGraph;
27 import org.simantics.db.impl.VirtualGraphImpl;
28 import org.simantics.db.impl.graph.ReadGraphImpl;
29 import org.simantics.db.impl.query.IntProcedure;
30 import org.simantics.db.impl.query.QueryProcessor;
31 import org.simantics.db.impl.query.QuerySupport;
32 import org.simantics.db.impl.support.BuiltinSupport;
33 import org.simantics.db.impl.support.ResourceSupport;
34 import org.simantics.db.procore.cluster.ClusterImpl;
35 import org.simantics.db.procore.cluster.ClusterSmall;
36 import org.simantics.db.service.SerialisationSupport;
37 import org.simantics.utils.DataContainer;
39 public class QuerySupportImpl implements QuerySupport {
41 final SessionImplSocket session;
43 final ClusterTable clusterTable;
44 final BuiltinSupport builtinSupport;
45 final ClusterSupport clusterSupport;
46 final ResourceSupport resourceSupport;
47 final SerialisationSupport serializationSupport;
48 final VirtualGraphServerSupportImpl virtualGraphServerSupport;
49 final GraphSession graphSession;
50 final SessionRequestManager syncThreads;
53 private boolean pendingPrimitives = false;
55 QuerySupportImpl(SessionImplSocket session, ClusterSupport clusterSupport, SerialisationSupport serializationSupport, SessionRequestManager syncThreads) {
56 this.session = session;
57 this.state = session.state;
58 this.clusterTable = session.clusterTable;
59 this.resourceSupport = session.resourceSupport;
60 this.virtualGraphServerSupport = session.virtualGraphServerSupport;
61 this.graphSession = session.graphSession;
62 this.builtinSupport = session.builtinSupport;
63 this.clusterSupport = clusterSupport;
64 this.serializationSupport = serializationSupport;
65 this.syncThreads = syncThreads;
66 this.root = getBuiltin("http:/");
67 assert(this.session != null);
68 assert(this.state != null);
69 assert(this.clusterTable != null);
70 assert(this.resourceSupport != null);
71 assert(this.virtualGraphServerSupport != null);
72 assert(this.graphSession != null);
73 assert(this.builtinSupport != null);
74 assert(this.clusterSupport != null);
75 assert(this.serializationSupport != null);
76 assert(this.syncThreads != null);
80 public ResourceSupport getSupport() {
81 return resourceSupport;
85 public Statement getStatement(int s, int p, int o) {
86 return getStatement(null, s, p, o);
90 public Statement getStatement(ReadGraphImpl graph, int s, int p, int o) {
91 Resource sr = getResource(s);
92 Resource pr = getResource(p);
93 Resource or = getResource(o);
94 return new StandardStatement(sr, pr, or);
98 public Session getSession() {
103 public long getClusterId(int id) {
104 ClusterI cluster = clusterTable.getClusterByResourceKey(id);
105 if(cluster == null) return 0;
106 return cluster.getClusterId();
110 public boolean isImmutable(int id) {
111 // Virtuals are mutable
112 if(id < 0) return false;
113 // Root library is mutable
114 if(root == id) return false;
115 // Anything goes in service mode
116 if(session.serviceMode > 0) return false;
117 return clusterTable.isImmutable(id);
121 public int getId(Resource resource) {
122 if (resource instanceof ResourceImpl)
123 return ((ResourceImpl)resource).id;
128 public Resource getResource(int id) {
130 return serializationSupport.getResource(id);
131 } catch (DatabaseException e) {
138 public boolean resume(ReadGraphImpl graph) {
140 return syncThreads.session.queryProvider2.resume(graph);
145 // final public void sync(int resumeThread, final SessionRunnable runnable) {
147 // syncThreads.session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(resumeThread) {
150 // public void run(int thread) {
151 // runnable.run(thread);
159 // final public int nextSyncThread() {
160 // throw new Error();
161 //// return syncThreads.nextThread();
165 public void dirtyPrimitives() {
166 session.dirtyPrimitives = true;
167 if(state.getWriteCount() == 0 && !pendingPrimitives) {
168 pendingPrimitives = true;
169 session.asyncRequest(new WriteRequest() {
172 public void perform(WriteGraph graph) throws DatabaseException {
173 pendingPrimitives = false;
182 final public void aboutToRead() {
186 // public void increaseReferenceCount(int callerThread, int subject) {
189 // ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
190 // if (null == proxy)
192 // proxy.increaseReferenceCount(callerThread, 1);
196 // public void decreaseReferenceCount(int callerThread, int subject) {
199 // ClusterProxy proxy = clusterTable.getClusterByResourceKey(subject);
200 // if (null == proxy)
202 // proxy.decreaseReferenceCount(callerThread, 1);
206 public void getObjects4(final ReadGraphImpl graph, final int subject, final ForEachObjectProcedure procedure) {
210 for(TransientGraph g : virtualGraphServerSupport.providers) {
211 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
213 // int suggestSchedule = graph.processor.processor.resourceThread(id);
214 // if(graph.callerThread == suggestSchedule) {
215 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
217 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
220 // public void run(int thread) {
221 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
229 procedure.finished(graph);
235 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
236 if(!cluster.isLoaded()) {
237 cluster.load(session.clusterTranslator, new Runnable() {
241 getObjects4(graph, subject, procedure);
248 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
250 for(TransientGraph g : virtualGraphServerSupport.providers) {
251 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
252 // int suggestSchedule = graph.processor.processor.resourceThread(id);
253 // if(graph.callerThread == suggestSchedule) {
254 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
256 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
259 // public void run(int thread) {
260 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
269 cluster.forObjects(graph, subject, procedure);
270 } catch (DatabaseException e) {
277 cluster.forObjects(graph, subject, procedure);
278 } catch (DatabaseException e) {
286 public <C> void getObjects4(final ReadGraphImpl graph, final int subject, final C context, final ForEachObjectContextProcedure<C> procedure) {
290 for(TransientGraph g : virtualGraphServerSupport.providers) {
291 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
293 // int suggestSchedule = graph.processor.processor.resourceThread(id);
294 // if(graph.callerThread == suggestSchedule) {
295 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
297 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
300 // public void run(int thread) {
301 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
309 procedure.finished(graph, context);
315 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
316 if(!cluster.isLoaded()) {
317 cluster.load(session.clusterTranslator, new Runnable() {
321 getObjects4(graph, subject, context, procedure);
328 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
330 for(TransientGraph g : virtualGraphServerSupport.providers) {
331 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
332 // int suggestSchedule = graph.processor.processor.resourceThread(id);
333 // if(graph.callerThread == suggestSchedule) {
334 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
336 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
339 // public void run(int thread) {
340 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
349 cluster.forObjects(graph, subject, context, procedure);
350 } catch (DatabaseException e) {
357 cluster.forObjects(graph, subject, context, procedure);
358 } catch (DatabaseException e) {
367 public int getSingleInstance(final int subject) {
369 // Do not process this information for virtual resources
370 if(subject < 0) return 0;
372 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
374 System.out.println("null cluster: " + Integer.toString(subject, 16));
375 assert (cluster != null);
379 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
380 if(ClusterI.CompleteTypeEnum.InstanceOf == type) {
381 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
388 } catch (DatabaseException e) {
390 Logger.defaultLogError(e);
394 // This happens is the resource is bogus
395 catch (Throwable t) {
397 analyseProblem(cluster);
399 Logger.defaultLogError(t);
408 public int getSingleSuperrelation(final int subject) {
410 // Do not process this information for virtual resources
411 if(subject < 0) return 0;
413 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
415 System.out.println("null cluster: " + Integer.toString(subject, 16));
416 assert (cluster != null);
420 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
421 if(ClusterI.CompleteTypeEnum.SubrelationOf == type) {
422 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
429 } catch (DatabaseException e) {
431 Logger.defaultLogError(e);
439 // public void getSingleSuperrelation(ReadGraphImpl graph, final int subject, final AsyncProcedure<Resource> procedure) {
441 // // Do not process this information for virtual resources
443 // procedure.execute(graph, null);
444 // graph.state.barrier.dec();
448 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
449 // if (cluster == null)
450 // System.out.println("null cluster: " + Integer.toString(subject, 16));
452 // assert (cluster != null);
454 // if(!cluster.isLoaded()) {
456 // procedure.execute(graph, null);
457 // graph.state.barrier.dec();
460 //// queryProvider2.requestCluster(callerThread, cluster.getClusterId(), new Callback<Integer>() {
463 //// public void run(Integer i) {
465 //// queryProvider2.schedule(i, callerThread, new Runnable() {
468 //// public void run() {
472 //// ClusterI.CompleteTypeEnum type = cluster.getCompleteType(callerThread, subject, SessionImplSocket.this);
473 //// if(ClusterI.CompleteTypeEnum.SubrelationOf == type) {
474 //// int result = cluster.getCompleteObjectKey(callerThread, subject, SessionImplSocket.this);
475 //// assert(result > 0);
476 //// procedure.execute(graph, getResourceByKey(result));
478 //// procedure.execute(graph, null);
480 //// graph.state.barrier.dec();
482 //// } catch (DatabaseException e) {
483 //// e.printStackTrace();
498 // ClusterI.CompleteTypeEnum type = cluster.getCompleteType(graph.callerThread, subject, clusterSupport);
499 // if(ClusterI.CompleteTypeEnum.SubrelationOf == type) {
500 // int result = cluster.getCompleteObjectKey(graph.callerThread, subject, clusterSupport);
501 // assert(result > 0);
502 // procedure.execute(graph, new ResourceImpl(resourceSupport, result));
504 // procedure.execute(graph, null);
506 // graph.state.barrier.dec();
508 // } catch (DatabaseException e) {
509 // e.printStackTrace();
518 // public void getObjects2(final int callerThread, final int subject, final int predicate, final IntProcedure procedure) {
519 // ensureLoaded(callerThread, subject, predicate, new Runnable() {
522 // public void run() {
523 // safeGetObjects2(callerThread, subject, predicate, procedure);
529 // public void safeGetObjects2(final ReadGraphImpl graph, final int subject, final int predicate, final IntProcedure procedure) {
531 // assert (subject != 0);
532 // assert (predicate != 0);
533 //// System.out.println("getObjects2: s=" + subject + "p=" + predicate);
534 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
535 // if (providers != null) {
537 // final TIntHashSet result = new TIntHashSet(16);
539 // for (VirtualGraph provider : providers) {
541 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
543 // if (result.add(id)) {
544 // procedure.execute(graph, id);
554 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
556 // assert (testCluster(subject, cluster));
558 // // wheels within wheels
559 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
562 // public boolean execute(int callerThread, Object context, int object) {
564 // if (result.add(object)) {
565 // procedure.execute(graph.newAsync(callerThread), object);
568 // return false; // continue looping
573 // public boolean found() {
574 // throw new UnsupportedOperationException();
580 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
581 // } catch (DatabaseException e) {
582 // Logger.defaultLogError(e);
583 // } catch (Throwable t) {
584 // Logger.defaultLogError(t);
585 // t.printStackTrace();
591 // assert(subject > 0);
593 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
595 // assert (testCluster(subject, cluster));
598 // cluster.forObjects(graph.callerThread, subject, predicate, new Wheels(procedure), null, clusterSupport);
599 // } catch (DatabaseException e) {
600 // Logger.defaultLogError(e);
601 // } catch (Throwable t) {
602 // t.printStackTrace();
603 // Logger.defaultLogError(t);
609 public boolean getObjects(final ReadGraphImpl graph, final int subject, final int predicate, final IntProcedure procedure) {
611 assert (subject != 0);
612 assert (predicate != 0);
616 boolean found = false;
618 for(TransientGraph g : virtualGraphServerSupport.providers) {
619 for (final int id : g.getObjects(subject, predicate)) {
621 procedure.execute(graph, id);
629 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
630 assert(cluster.isLoaded());
632 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
634 final DataContainer<Boolean> found = new DataContainer<Boolean>(Boolean.FALSE);
635 final TIntHashSet result = new TIntHashSet(5);
637 for(TransientGraph g : virtualGraphServerSupport.providers) {
638 for (final int id : g.getObjects(subject, predicate)) {
641 if (result.add(id)) {
642 procedure.execute(graph, id);
648 // Virtual predicates are not found from persistent clusters
649 if(predicate < 0) return found.get();
651 // wheels within wheels
652 final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
655 public boolean execute(Object context, int object) {
658 if (result.add(object)) {
659 procedure.execute(graph, object);
662 return false; // continue looping
669 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
670 } catch (DatabaseException e) {
678 // Virtual predicates are not found from persistent clusters
679 if(predicate < 0) return false;
681 class A implements ClusterI.ObjectProcedure<Object> {
683 boolean found = false;
686 public boolean execute(Object context, int object) {
689 procedure.execute(graph, object);
691 return false; // continue looping
695 public boolean found() {
701 // wheels within wheels
702 final A proc = new A();
705 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
706 } catch (DatabaseException e) {
709 // This happens if resource is bogus
710 catch (Throwable t) {
712 analyseProblem(cluster);
714 Logger.defaultLogError(t);
725 // public boolean getFunctionalObject(final ReadGraphImpl graph, final int subject, final int predicate,
726 // final IntProcedure procedure) {
728 // assert (subject != 0);
729 // assert (predicate != 0);
733 // boolean found = false;
735 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
736 // if (providers != null) {
738 // for (VirtualGraph provider : providers) {
740 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
742 // procedure.execute(graph, id);
753 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
755 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
757 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
758 // final TIntHashSet result = new TIntHashSet(5);
760 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
761 // if (providers != null) {
763 // for (VirtualGraph provider : providers) {
765 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
767 // procedure.execute(graph, id);
774 // // wheels within wheels
775 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
778 // public boolean execute(int callerThread, Object context, int object) {
781 // System.out.println("-found object " + object);
782 // if (result.add(object)) {
783 // procedure.execute(graph.newAsync(callerThread), object);
786 // return false; // continue looping
791 // public boolean found() {
792 // throw new UnsupportedOperationException();
798 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
799 // } catch (DatabaseException e) {
800 // Logger.defaultLogError(e);
802 // } catch (Throwable t) {
803 // Logger.defaultLogError(t);
804 // t.printStackTrace();
807 // return found.get();
811 // // wheels within wheels
812 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
814 // boolean found = false;
817 // public boolean execute(int callerThread, Object context, int object) {
820 // procedure.execute(graph.newAsync(callerThread), object);
821 // return false; // continue looping
826 // public boolean found() {
831 // public String toString() {
832 // return "Wheels for " + procedure;
838 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
839 // } catch (DatabaseException e) {
840 // Logger.defaultLogError(e);
842 // } catch (Throwable t) {
843 // t.printStackTrace();
844 // Logger.defaultLogError(t);
848 // return proc.found();
855 public int getFunctionalObject(final int subject, final int predicate) {
857 assert (subject != 0);
858 assert (predicate != 0);
864 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
865 if (providers != null) {
867 for (VirtualGraph provider : providers) {
869 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
870 if(found == 0) found = id;
879 // if(found == -1) return 0;
880 // else return found;
884 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
886 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
889 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
890 if (providers != null) {
892 for (VirtualGraph provider : providers) {
894 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
895 if(result == 0) result = id;
903 if(result != 0) return result;
906 return cluster.getSingleObject(subject, predicate, clusterSupport);
907 } catch (DatabaseException e) {
914 return cluster.getSingleObject(subject, predicate, clusterSupport);
915 } catch (DatabaseException e) {
918 // This comes if the resource is bogus
919 catch (Throwable t) {
921 analyseProblem(cluster);
923 Logger.defaultLogError(t);
934 // public boolean getStatements(final ReadGraphImpl graph, final int subject, final int predicate,
935 // final TripleIntProcedure procedure, final Statements entry) {
937 // assert (subject != 0);
938 // assert (predicate != 0);
940 // final TIntHashSet result = new TIntHashSet(16);
941 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
943 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
945 // if (providers != null) {
947 // for (VirtualGraph provider : providers) {
949 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
953 // if (result.add(id)) {
954 // if (null != entry) {
955 // entry.addOrSet(subject, predicate, id);
956 // procedure.execute(graph, subject, predicate, id);
957 // return true; // break;
959 // procedure.execute(graph, subject, predicate, id);
970 // return found.get();
972 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
973 // assert (cluster != null);
975 // // wheels within wheels
976 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
978 // public boolean execute(int callerThread, Object context, int object) {
982 // if (result.add(object)) {
983 // if (null != entry) {
984 // entry.addOrSet(subject, predicate, object);
985 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
986 // return true; // break;
988 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
992 // return false; // continue looping
997 // public boolean found() {
998 // throw new UnsupportedOperationException();
1004 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
1005 // } catch (DatabaseException e) {
1006 // Logger.defaultLogError(e);
1008 // return found.get();
1013 public org.simantics.db.DirectStatements getStatements(final ReadGraphImpl graph, final int subject, final QueryProcessor processor, boolean ignoreVirtual) {
1015 assert (subject != 0);
1017 final DirectStatementsImpl result = new DirectStatementsImpl(resourceSupport, subject);
1019 if (!ignoreVirtual) {
1020 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
1021 if (providers != null) {
1022 for (TransientGraph provider : providers) {
1023 for (int p : provider.getPredicates(subject)) {
1024 for (int o : provider.getObjects(subject, p)) {
1025 result.addStatement(p, o);
1035 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1036 assert (cluster != null);
1037 doGetStatements(graph, cluster, subject, result);
1045 // public void getStatements(ReadGraphImpl graph, final int subject, final Procedure<DirectStatements> procedure) {
1047 // procedure.exception(new DatabaseException("Not supported"));
1052 public void getPredicates(final ReadGraphImpl graph, final int subject, final IntProcedure procedure) {
1054 final TIntHashSet result = new TIntHashSet(16);
1056 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
1058 if (providers != null) {
1060 for (VirtualGraph provider : providers) {
1062 for (int id : ((VirtualGraphImpl)provider).getPredicates(subject)) {
1064 if (result.add(id)) {
1065 procedure.execute(graph, id);
1075 procedure.finished(graph);
1079 ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
1080 // if(!proxy.isLoaded()) {
1082 // proxy.load(callerThread, session, new Runnable() {
1085 // public void run() {
1086 // getPredicates(callerThread, subject, procedure);
1093 assert (proxy != null);
1095 final DataContainer<Integer> got = new DataContainer<Integer>(0);
1096 ClusterI.PredicateProcedure<Object> proc = new ClusterI.PredicateProcedure<Object>() {
1098 public boolean execute(Object context, int predicate, int oi) {
1099 if (result.add(predicate)) {
1100 procedure.execute(graph, predicate);
1102 got.set(got.get() + 1);
1103 return false; // continue looping
1107 proxy.forPredicates(subject, proc, null, clusterSupport);
1108 } catch (DatabaseException e) {
1109 Logger.defaultLogError(e);
1111 procedure.finished(graph);
1118 // public void getValue(ReadGraphImpl graph, int resource, InternalProcedure<byte[]> procedure) {
1120 // if(resource < 0) {
1122 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1124 // if (providers != null) {
1126 // for (VirtualGraph provider : providers) {
1128 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
1129 // if (value != null) {
1130 // procedure.execute(graph, (byte[])value);
1142 // ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1143 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1145 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1147 // if (providers != null) {
1149 // for (VirtualGraph provider : providers) {
1151 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
1152 // if (value != null) {
1153 // procedure.execute(graph, (byte[])value);
1163 // byte[] data = cluster.getValue(resource, clusterSupport);
1164 // if (null == data || 0 == data.length) {
1165 // procedure.execute(graph, null);
1167 // procedure.execute(graph, data);
1170 // } catch (DatabaseException e) {
1171 // Logger.defaultLogError(e);
1178 // byte[] data = cluster.getValue(resource, clusterSupport);
1179 // if (null == data || 0 == data.length) {
1180 // procedure.execute(graph, null);
1182 // procedure.execute(graph, data);
1185 // } catch (DatabaseException e) {
1186 // Logger.defaultLogError(e);
1195 public byte[] getValue(ReadGraphImpl graph, int resource) {
1199 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1201 if (providers != null) {
1203 for (VirtualGraph provider : providers) {
1205 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1206 if (value != null) {
1207 return (byte[])value;
1218 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1219 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1221 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1223 if (providers != null) {
1225 for (VirtualGraph provider : providers) {
1227 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1228 if (value != null) {
1229 return (byte[])value;
1238 byte[] data = cluster.getValue(resource, clusterSupport);
1239 if (null != data && 0 != data.length) {
1243 } catch (DatabaseException e) {
1244 Logger.defaultLogError(e);
1253 byte[] data = cluster.getValue(resource, clusterSupport);
1254 if (null != data && 0 != data.length) {
1258 } catch (DatabaseException e) {
1259 Logger.defaultLogError(e);
1269 public InputStream getValueStream(ReadGraphImpl graph, int resource) {
1273 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1275 if (providers != null) {
1277 for (VirtualGraph provider : providers) {
1279 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1280 if (value != null) {
1281 return new ByteArrayInputStream((byte[])value);
1292 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1293 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1295 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1297 if (providers != null) {
1299 for (VirtualGraph provider : providers) {
1301 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1302 if (value != null) {
1303 return new ByteArrayInputStream((byte[])value);
1312 return cluster.getValueStream(resource, clusterSupport);
1314 } catch (DatabaseException e) {
1315 Logger.defaultLogError(e);
1324 return cluster.getValueStream(resource, clusterSupport);
1326 } catch (DatabaseException e) {
1327 Logger.defaultLogError(e);
1337 public void requestCluster(ReadGraphImpl graph, final long clusterId, final Runnable r) {
1339 class CallbackAdapter implements Consumer<DatabaseException> {
1341 CallbackAdapter(final Runnable r) {
1345 public void accept(DatabaseException e) {
1347 e.printStackTrace();
1354 double p = clusterTable.getLoadProbability();
1355 // System.out.print("Load cluster " + clusterId + " with probability " + p +
1357 final ClusterI proxy = clusterSupport.getClusterByClusterId(clusterId);
1358 if (!proxy.isLoaded()) {
1360 if (Math.random() < p) {
1361 proxy.load(new CallbackAdapter(r));
1372 public int getBuiltin(String uri) {
1373 return builtinSupport.getBuiltin(uri);
1377 public void checkTasks() {
1378 System.out.println(syncThreads.toString());
1382 // public void asyncWrite(Write request) {
1384 //// if(plainWrite(writer) && sameProvider(request)) {
1385 //// writer.writeSupport.pushRequest(request);
1387 // asyncRequest(request);
1398 private void doGetStatements(ReadGraphImpl graph, final ClusterI cluster, final int subject, final DirectStatementsImpl result) {
1400 final class Proc implements ClusterI.PredicateProcedure<Object> {
1403 public boolean execute(Object context, final int predicate, final int objectIndex) {
1405 doExecute(null, predicate, objectIndex);
1410 private void doExecute(Object context, final int predicate, final int objectIndex) {
1413 cluster.forObjects(subject, predicate, new ClusterI.ObjectProcedure<Object>() {
1416 public boolean execute(Object context, int object) {
1417 result.addStatement(predicate, object);
1421 }, null, clusterSupport);
1422 } catch (DatabaseException e) {
1423 e.printStackTrace();
1431 cluster.forPredicates(subject, new Proc(), null, clusterSupport);
1432 } catch (DatabaseException e) {
1433 e.printStackTrace();
1438 // private void getDirectObjects4(final int callerThread, final ClusterI cluster, final int subject, final int predicate, final QueryProcessor processor, final ReadGraphImpl graph, final ForEachObjectProcedure procedure) {
1440 // if(!cluster.isLoaded()) {
1442 // requestCluster(callerThread, cluster.getClusterId(), new Callback<Integer>() {
1445 // public void run(Integer i) {
1447 // processor.schedule(i, new SessionTask(callerThread) {
1450 // public void run(int thread) {
1452 // getDirectObjects4(thread, cluster, subject, predicate, processor, graph, procedure);
1465 // cluster.forObjects(graph, subject, predicate, procedure);
1466 // } catch (DatabaseException e) {
1467 // e.printStackTrace();
1470 //// procedure.finished(graph);
1471 //// graph.state.barrier.dec();
1473 //// System.err.println("ai2=" + ai2.decrementAndGet());
1480 // AtomicInteger ai2 =new AtomicInteger(0);
1482 // private boolean testCluster(int subject, ClusterI proxy) {
1484 // if (proxy == null)
1485 // System.out.println("null cluster: " + Integer.toString(subject, 16));
1487 // return proxy != null;
1491 long getCluster(int id) {
1493 if(id < 0) return 0;
1494 ClusterI proxy = clusterTable.getClusterByResourceKey(id);
1495 if(proxy == null) return 0;
1496 else return proxy.getClusterId();
1500 public int getRandomAccessReference(String id) throws ResourceNotFoundException {
1503 Resource res = serializationSupport.getResourceSerializer().getResource(id);
1504 if(res == null) return 0;
1505 else return ((ResourceImpl)res).id;
1506 } catch (InvalidResourceReferenceException e) {
1507 //e.printStackTrace();
1514 public void ensureLoaded(final ReadGraphImpl graph, final int subject, final int predicate) {
1518 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1522 final ClusterI cluster = clusterTable.checkedGetClusterByResourceKey(subject);
1524 if(cluster.isLoaded()) {
1526 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1528 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1536 new Exception().printStackTrace();
1538 cluster.load(session.clusterTranslator, new Runnable() {
1543 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1545 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1563 public void ensureLoaded(final ReadGraphImpl graph, final int subject) {
1567 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1571 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1573 if(cluster.isLoaded()) {
1575 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1577 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1585 // System.err.println("cluster not loaded " + subject);
1586 new Exception().printStackTrace();
1588 cluster.load(session.clusterTranslator, new Runnable() {
1593 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1595 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1613 public boolean isLoaded(int subject) {
1614 ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1615 return cluster.isLoaded();
1619 public void ceased(int thread) {
1621 session.ceased(thread);
1626 public Object getLock() {
1628 return session.requestManager;
1633 public VirtualGraph getProvider(int subject, int predicate, int object) {
1636 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1637 // This persistent resource does not have virtual statements => must deny in persistent graph
1638 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1641 for(TransientGraph g : virtualGraphServerSupport.providers) {
1642 for (final int id : g.getObjects(subject, predicate)) {
1643 if(object == id) return g;
1647 // Nothing found from virtual graphs
1653 public VirtualGraph getProvider(int subject, int predicate) {
1656 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1657 // This persistent resource does not have virtual statements => must deny in persistent graph
1658 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1661 TransientGraph result = null;
1662 for(TransientGraph g : virtualGraphServerSupport.providers) {
1663 if(g.getObjects(subject, predicate).length > 0) {
1664 // Found multiple, return null;
1665 if(result != null) return null;
1674 public VirtualGraph getValueProvider(int subject) {
1677 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1678 // This persistent resource does not have virtual statements => must deny in persistent graph
1679 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1682 TransientGraph result = null;
1683 for(TransientGraph g : virtualGraphServerSupport.providers) {
1684 if(g.getValue(subject) != null) {
1685 if(result != null) return null;
1694 public void exit(Throwable t) {
1698 private void analyseProblem(ClusterI cluster) {
1700 if(cluster instanceof ClusterSmall)
1702 ((ClusterSmall)cluster).check();
1703 } catch (DatabaseException e) {
1704 e.printStackTrace();