1 package fi.vtt.simantics.procore.internal;
3 import java.io.ByteArrayInputStream;
4 import java.io.InputStream;
5 import java.util.Collection;
6 import java.util.function.Consumer;
8 import org.simantics.db.Resource;
9 import org.simantics.db.Session;
10 import org.simantics.db.Statement;
11 import org.simantics.db.VirtualGraph;
12 import org.simantics.db.WriteGraph;
13 import org.simantics.db.common.StandardStatement;
14 import org.simantics.db.common.request.WriteRequest;
15 import org.simantics.db.common.utils.Logger;
16 import org.simantics.db.exception.DatabaseException;
17 import org.simantics.db.exception.InvalidResourceReferenceException;
18 import org.simantics.db.exception.ResourceNotFoundException;
19 import org.simantics.db.impl.ClusterI;
20 import org.simantics.db.impl.ClusterSupport;
21 import org.simantics.db.impl.ForEachObjectContextProcedure;
22 import org.simantics.db.impl.ForEachObjectProcedure;
23 import org.simantics.db.impl.ResourceImpl;
24 import org.simantics.db.impl.TransientGraph;
25 import org.simantics.db.impl.VirtualGraphImpl;
26 import org.simantics.db.impl.graph.ReadGraphImpl;
27 import org.simantics.db.impl.query.IntProcedure;
28 import org.simantics.db.impl.query.QueryProcessor;
29 import org.simantics.db.impl.query.QuerySupport;
30 import org.simantics.db.impl.support.BuiltinSupport;
31 import org.simantics.db.impl.support.ResourceSupport;
32 import org.simantics.db.procore.cluster.ClusterImpl;
33 import org.simantics.db.procore.cluster.ClusterSmall;
34 import org.simantics.db.service.SerialisationSupport;
35 import org.simantics.utils.DataContainer;
36 import org.slf4j.LoggerFactory;
38 import gnu.trove.set.hash.TIntHashSet;
40 public class QuerySupportImpl implements QuerySupport {
42 private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(QuerySupportImpl.class);
44 final SessionImplSocket session;
46 final ClusterTable clusterTable;
47 final BuiltinSupport builtinSupport;
48 final ClusterSupport clusterSupport;
49 final ResourceSupport resourceSupport;
50 final SerialisationSupport serializationSupport;
51 final VirtualGraphServerSupportImpl virtualGraphServerSupport;
52 final GraphSession graphSession;
53 final SessionRequestManager syncThreads;
56 private boolean pendingPrimitives = false;
58 QuerySupportImpl(SessionImplSocket session, ClusterSupport clusterSupport, SerialisationSupport serializationSupport, SessionRequestManager syncThreads) {
59 this.session = session;
60 this.state = session.state;
61 this.clusterTable = session.clusterTable;
62 this.resourceSupport = session.resourceSupport;
63 this.virtualGraphServerSupport = session.virtualGraphServerSupport;
64 this.graphSession = session.graphSession;
65 this.builtinSupport = session.builtinSupport;
66 this.clusterSupport = clusterSupport;
67 this.serializationSupport = serializationSupport;
68 this.syncThreads = syncThreads;
69 this.root = getBuiltin("http:/");
70 assert(this.session != null);
71 assert(this.state != null);
72 assert(this.clusterTable != null);
73 assert(this.resourceSupport != null);
74 assert(this.virtualGraphServerSupport != null);
75 assert(this.graphSession != null);
76 assert(this.builtinSupport != null);
77 assert(this.clusterSupport != null);
78 assert(this.serializationSupport != null);
79 assert(this.syncThreads != null);
83 public ResourceSupport getSupport() {
84 return resourceSupport;
88 public Statement getStatement(int s, int p, int o) {
89 return getStatement(null, s, p, o);
93 public Statement getStatement(ReadGraphImpl graph, int s, int p, int o) {
94 Resource sr = getResource(s);
95 Resource pr = getResource(p);
96 Resource or = getResource(o);
97 return new StandardStatement(sr, pr, or);
101 public Session getSession() {
106 public long getClusterId(int id) {
107 ClusterI cluster = clusterTable.getClusterByResourceKey(id);
108 if(cluster == null) return 0;
109 return cluster.getClusterId();
113 public boolean isImmutable(int id) {
114 // Virtuals are mutable
115 if(id < 0) return false;
116 // Root library is mutable
117 if(root == id) return false;
118 // Anything goes in service mode
119 if(session.serviceMode > 0) return false;
120 return clusterTable.isImmutable(id);
124 public int getId(Resource resource) {
125 if (resource instanceof ResourceImpl)
126 return ((ResourceImpl)resource).id;
131 public Resource getResource(int id) {
133 return serializationSupport.getResource(id);
134 } catch (DatabaseException e) {
135 // TODO: consider changing this method to throw something
136 LOGGER.error("getResource(" + id + ") failed", e);
142 public boolean resume(ReadGraphImpl graph) {
144 return syncThreads.session.queryProvider2.resume(graph);
149 // final public void sync(int resumeThread, final SessionRunnable runnable) {
151 // syncThreads.session.queryProvider2.schedule(Integer.MIN_VALUE, new SessionTask(resumeThread) {
154 // public void run(int thread) {
155 // runnable.run(thread);
163 // final public int nextSyncThread() {
164 // throw new Error();
165 //// return syncThreads.nextThread();
169 public void dirtyPrimitives() {
170 session.dirtyPrimitives = true;
171 if(state.getWriteCount() == 0 && !pendingPrimitives) {
172 pendingPrimitives = true;
173 session.asyncRequest(new WriteRequest() {
176 public void perform(WriteGraph graph) throws DatabaseException {
177 pendingPrimitives = false;
186 final public void aboutToRead() {
190 // public void increaseReferenceCount(int callerThread, int subject) {
193 // ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
194 // if (null == proxy)
196 // proxy.increaseReferenceCount(callerThread, 1);
200 // public void decreaseReferenceCount(int callerThread, int subject) {
203 // ClusterProxy proxy = clusterTable.getClusterByResourceKey(subject);
204 // if (null == proxy)
206 // proxy.decreaseReferenceCount(callerThread, 1);
210 public void getObjects4(final ReadGraphImpl graph, final int subject, final ForEachObjectProcedure procedure) {
214 for(TransientGraph g : virtualGraphServerSupport.providers) {
215 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
217 // int suggestSchedule = graph.processor.processor.resourceThread(id);
218 // if(graph.callerThread == suggestSchedule) {
220 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
221 } catch (DatabaseException e) {
222 LOGGER.error("Unexpected exception while handling object", e);
225 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
228 // public void run(int thread) {
229 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
238 procedure.finished(graph);
239 } catch (DatabaseException e) {
240 LOGGER.error("Unexpected exception while handling objects", e);
247 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
248 if(!cluster.isLoaded()) {
251 } catch (DatabaseException e) {
252 LOGGER.error("Unexpected exception while handling objects", e);
254 getObjects4(graph, subject, procedure);
258 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
260 for(TransientGraph g : virtualGraphServerSupport.providers) {
261 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
263 procedure.execute(graph, new ResourceImpl(resourceSupport, id));
264 } catch (DatabaseException e) {
265 LOGGER.error("Unexpected exception while handling objects", e);
269 // int suggestSchedule = graph.processor.processor.resourceThread(id);
270 // if(graph.callerThread == suggestSchedule) {
272 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
275 // public void run(int thread) {
276 // procedure.execute(graph.newAsync(thread), new ResourceImpl(resourceSupport, id));
285 cluster.forObjects(graph, subject, procedure);
286 } catch (DatabaseException e) {
293 cluster.forObjects(graph, subject, procedure);
294 } catch (DatabaseException e) {
302 public <C> void getObjects4(final ReadGraphImpl graph, final int subject, final C context, final ForEachObjectContextProcedure<C> procedure) {
306 for(TransientGraph g : virtualGraphServerSupport.providers) {
307 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
309 // int suggestSchedule = graph.processor.processor.resourceThread(id);
310 // if(graph.callerThread == suggestSchedule) {
313 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
314 } catch (DatabaseException e) {
315 LOGGER.error("Unexpected exception while handling objects", e);
321 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
324 // public void run(int thread) {
325 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
335 procedure.finished(graph, context);
336 } catch (DatabaseException e) {
337 LOGGER.error("Unexpected exception while handling objects", e);
346 final ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
347 if(!cluster.isLoaded()) {
348 cluster.load(session.clusterTranslator, new Runnable() {
352 getObjects4(graph, subject, context, procedure);
359 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
361 for(TransientGraph g : virtualGraphServerSupport.providers) {
362 for (final int id : g.getObjects(subject, procedure.predicateKey)) {
363 // int suggestSchedule = graph.processor.processor.resourceThread(id);
364 // if(graph.callerThread == suggestSchedule) {
366 procedure.execute(graph, context, new ResourceImpl(resourceSupport, id));
367 } catch (DatabaseException e) {
368 LOGGER.error("Unexpected exception while handling objects", e);
371 // graph.processor.processor.schedule(graph.callerThread, new SessionTask(suggestSchedule) {
374 // public void run(int thread) {
375 // procedure.execute(graph.newAsync(thread), context, new ResourceImpl(resourceSupport, id));
384 cluster.forObjects(graph, subject, context, procedure);
385 } catch (DatabaseException e) {
392 cluster.forObjects(graph, subject, context, procedure);
393 } catch (DatabaseException e) {
402 public int getSingleInstance(final int subject) {
404 // Do not process this information for virtual resources
405 if(subject < 0) return 0;
407 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
409 System.out.println("null cluster: " + Integer.toString(subject, 16));
410 assert (cluster != null);
414 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
415 if(ClusterI.CompleteTypeEnum.InstanceOf == type) {
416 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
423 } catch (DatabaseException e) {
425 Logger.defaultLogError(e);
429 // This happens is the resource is bogus
430 catch (Throwable t) {
432 analyseProblem(cluster);
434 Logger.defaultLogError(t);
443 public int getSingleSuperrelation(final int subject) {
445 // Do not process this information for virtual resources
446 if(subject < 0) return 0;
448 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
450 System.out.println("null cluster: " + Integer.toString(subject, 16));
451 assert (cluster != null);
455 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subject, clusterSupport);
456 if(ClusterI.CompleteTypeEnum.SubrelationOf == type) {
457 int result = cluster.getCompleteObjectKey(subject, clusterSupport);
464 } catch (DatabaseException e) {
466 Logger.defaultLogError(e);
474 public boolean getObjects(final ReadGraphImpl graph, final int subject, final int predicate, final IntProcedure procedure) throws DatabaseException {
476 assert (subject != 0);
477 assert (predicate != 0);
481 boolean found = false;
483 for(TransientGraph g : virtualGraphServerSupport.providers) {
484 for (final int id : g.getObjects(subject, predicate)) {
486 procedure.execute(graph, id);
494 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
495 assert(cluster.isLoaded());
497 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
499 final DataContainer<Boolean> found = new DataContainer<Boolean>(Boolean.FALSE);
500 final TIntHashSet result = new TIntHashSet(5);
502 for(TransientGraph g : virtualGraphServerSupport.providers) {
503 for (final int id : g.getObjects(subject, predicate)) {
506 if (result.add(id)) {
507 procedure.execute(graph, id);
513 // Virtual predicates are not found from persistent clusters
514 if(predicate < 0) return found.get();
516 // wheels within wheels
517 final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
520 public boolean execute(Object context, int object) {
523 if (result.add(object)) {
525 procedure.execute(graph, object);
526 } catch (DatabaseException e) {
527 Logger.defaultLogError(e);
531 return false; // continue looping
538 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
539 } catch (DatabaseException e) {
547 // Virtual predicates are not found from persistent clusters
548 if(predicate < 0) return false;
550 class A implements ClusterI.ObjectProcedure<Object> {
552 boolean found = false;
555 public boolean execute(Object context, int object) {
559 procedure.execute(graph, object);
560 } catch (DatabaseException e) {
561 Logger.defaultLogError(e);
563 return false; // continue looping
567 public boolean found() {
573 // wheels within wheels
574 final A proc = new A();
577 cluster.forObjects(subject, predicate, proc, null, clusterSupport);
578 } catch (DatabaseException e) {
579 Logger.defaultLogError(e);
581 // This happens if resource is bogus
582 catch (Throwable t) {
584 analyseProblem(cluster);
586 Logger.defaultLogError(t);
597 // public boolean getFunctionalObject(final ReadGraphImpl graph, final int subject, final int predicate,
598 // final IntProcedure procedure) {
600 // assert (subject != 0);
601 // assert (predicate != 0);
605 // boolean found = false;
607 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
608 // if (providers != null) {
610 // for (VirtualGraph provider : providers) {
612 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
614 // procedure.execute(graph, id);
625 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
627 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
629 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
630 // final TIntHashSet result = new TIntHashSet(5);
632 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
633 // if (providers != null) {
635 // for (VirtualGraph provider : providers) {
637 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
639 // procedure.execute(graph, id);
646 // // wheels within wheels
647 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
650 // public boolean execute(int callerThread, Object context, int object) {
653 // System.out.println("-found object " + object);
654 // if (result.add(object)) {
655 // procedure.execute(graph.newAsync(callerThread), object);
658 // return false; // continue looping
663 // public boolean found() {
664 // throw new UnsupportedOperationException();
670 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
671 // } catch (DatabaseException e) {
672 // Logger.defaultLogError(e);
674 // } catch (Throwable t) {
675 // Logger.defaultLogError(t);
676 // t.printStackTrace();
679 // return found.get();
683 // // wheels within wheels
684 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
686 // boolean found = false;
689 // public boolean execute(int callerThread, Object context, int object) {
692 // procedure.execute(graph.newAsync(callerThread), object);
693 // return false; // continue looping
698 // public boolean found() {
703 // public String toString() {
704 // return "Wheels for " + procedure;
710 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
711 // } catch (DatabaseException e) {
712 // Logger.defaultLogError(e);
714 // } catch (Throwable t) {
715 // t.printStackTrace();
716 // Logger.defaultLogError(t);
720 // return proc.found();
727 public int getFunctionalObject(final int subject, final int predicate) {
729 assert (subject != 0);
730 assert (predicate != 0);
736 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
737 if (providers != null) {
739 for (VirtualGraph provider : providers) {
741 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
742 if(found == 0) found = id;
751 // if(found == -1) return 0;
752 // else return found;
756 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
758 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject)) {
761 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
762 if (providers != null) {
764 for (VirtualGraph provider : providers) {
766 for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
767 if(result == 0) result = id;
775 if(result != 0) return result;
778 return cluster.getSingleObject(subject, predicate, clusterSupport);
779 } catch (DatabaseException e) {
786 return cluster.getSingleObject(subject, predicate, clusterSupport);
787 } catch (DatabaseException e) {
790 // This comes if the resource is bogus
791 catch (Throwable t) {
793 analyseProblem(cluster);
795 Logger.defaultLogError(t);
806 // public boolean getStatements(final ReadGraphImpl graph, final int subject, final int predicate,
807 // final TripleIntProcedure procedure, final Statements entry) {
809 // assert (subject != 0);
810 // assert (predicate != 0);
812 // final TIntHashSet result = new TIntHashSet(16);
813 // final DataContainer<Boolean> found = new DataContainer<Boolean>(false);
815 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
817 // if (providers != null) {
819 // for (VirtualGraph provider : providers) {
821 // for (int id : ((VirtualGraphImpl)provider).getObjects(subject, predicate)) {
825 // if (result.add(id)) {
826 // if (null != entry) {
827 // entry.addOrSet(subject, predicate, id);
828 // procedure.execute(graph, subject, predicate, id);
829 // return true; // break;
831 // procedure.execute(graph, subject, predicate, id);
842 // return found.get();
844 // final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
845 // assert (cluster != null);
847 // // wheels within wheels
848 // final ClusterI.ObjectProcedure<Object> proc = new ClusterI.ObjectProcedure<Object>() {
850 // public boolean execute(int callerThread, Object context, int object) {
854 // if (result.add(object)) {
855 // if (null != entry) {
856 // entry.addOrSet(subject, predicate, object);
857 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
858 // return true; // break;
860 // procedure.execute(graph.newAsync(callerThread), subject, predicate, object);
864 // return false; // continue looping
869 // public boolean found() {
870 // throw new UnsupportedOperationException();
876 // cluster.forObjects(graph.callerThread, subject, predicate, proc, null, clusterSupport);
877 // } catch (DatabaseException e) {
878 // Logger.defaultLogError(e);
880 // return found.get();
885 public org.simantics.db.DirectStatements getStatements(final ReadGraphImpl graph, final int subject, final QueryProcessor processor, boolean ignoreVirtual) {
887 assert (subject != 0);
889 final DirectStatementsImpl result = new DirectStatementsImpl(resourceSupport, subject);
891 if (!ignoreVirtual) {
892 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
893 if (providers != null) {
894 for (TransientGraph provider : providers) {
895 for (int p : provider.getPredicates(subject)) {
896 for (int o : provider.getObjects(subject, p)) {
897 result.addStatement(p, o);
907 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
908 assert (cluster != null);
909 doGetStatements(graph, cluster, subject, result);
917 public void getPredicates(final ReadGraphImpl graph, final int subject, final IntProcedure procedure) throws DatabaseException {
919 final TIntHashSet result = new TIntHashSet(16);
921 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(subject);
923 if (providers != null) {
925 for (VirtualGraph provider : providers) {
927 for (int id : ((VirtualGraphImpl)provider).getPredicates(subject)) {
929 if (result.add(id)) {
930 procedure.execute(graph, id);
940 procedure.finished(graph);
944 ClusterI proxy = clusterTable.getClusterByResourceKey(subject);
945 assert (proxy != null);
947 final DataContainer<Integer> got = new DataContainer<Integer>(0);
948 ClusterI.PredicateProcedure<Object> proc = new ClusterI.PredicateProcedure<Object>() {
950 public boolean execute(Object context, int predicate, int oi) {
951 if (result.add(predicate)) {
953 procedure.execute(graph, predicate);
954 } catch (DatabaseException e) {
955 Logger.defaultLogError(e);
958 got.set(got.get() + 1);
959 return false; // continue looping
963 proxy.forPredicates(subject, proc, null, clusterSupport);
964 } catch (DatabaseException e) {
965 Logger.defaultLogError(e);
967 procedure.finished(graph);
974 // public void getValue(ReadGraphImpl graph, int resource, InternalProcedure<byte[]> procedure) {
976 // if(resource < 0) {
978 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
980 // if (providers != null) {
982 // for (VirtualGraph provider : providers) {
984 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
985 // if (value != null) {
986 // procedure.execute(graph, (byte[])value);
998 // ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
999 // if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1001 // Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1003 // if (providers != null) {
1005 // for (VirtualGraph provider : providers) {
1007 // Object value = ((VirtualGraphImpl)provider).getValue(resource);
1008 // if (value != null) {
1009 // procedure.execute(graph, (byte[])value);
1019 // byte[] data = cluster.getValue(resource, clusterSupport);
1020 // if (null == data || 0 == data.length) {
1021 // procedure.execute(graph, null);
1023 // procedure.execute(graph, data);
1026 // } catch (DatabaseException e) {
1027 // Logger.defaultLogError(e);
1034 // byte[] data = cluster.getValue(resource, clusterSupport);
1035 // if (null == data || 0 == data.length) {
1036 // procedure.execute(graph, null);
1038 // procedure.execute(graph, data);
1041 // } catch (DatabaseException e) {
1042 // Logger.defaultLogError(e);
1051 public byte[] getValue(ReadGraphImpl graph, int resource) {
1055 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1057 if (providers != null) {
1059 for (VirtualGraph provider : providers) {
1061 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1062 if (value != null) {
1063 return (byte[])value;
1074 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1075 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1077 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1079 if (providers != null) {
1081 for (VirtualGraph provider : providers) {
1083 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1084 if (value != null) {
1085 return (byte[])value;
1094 byte[] data = cluster.getValue(resource, clusterSupport);
1095 if (null != data && 0 != data.length) {
1099 } catch (DatabaseException e) {
1100 Logger.defaultLogError(e);
1109 byte[] data = cluster.getValue(resource, clusterSupport);
1110 if (null != data && 0 != data.length) {
1114 } catch (DatabaseException e) {
1115 Logger.defaultLogError(e);
1125 public InputStream getValueStream(ReadGraphImpl graph, int resource) {
1129 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1131 if (providers != null) {
1133 for (VirtualGraph provider : providers) {
1135 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1136 if (value != null) {
1137 return new ByteArrayInputStream((byte[])value);
1148 ClusterI cluster = clusterTable.getClusterByResourceKey(resource);
1149 if (cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(resource)) {
1151 Collection<TransientGraph> providers = virtualGraphServerSupport.getVirtualGraphs(resource);
1153 if (providers != null) {
1155 for (VirtualGraph provider : providers) {
1157 Object value = ((VirtualGraphImpl)provider).getValue(resource);
1158 if (value != null) {
1159 return new ByteArrayInputStream((byte[])value);
1168 return cluster.getValueStream(resource, clusterSupport);
1170 } catch (DatabaseException e) {
1171 Logger.defaultLogError(e);
1180 return cluster.getValueStream(resource, clusterSupport);
1182 } catch (DatabaseException e) {
1183 Logger.defaultLogError(e);
1193 public void requestCluster(ReadGraphImpl graph, final long clusterId, final Runnable r) {
1195 class CallbackAdapter implements Consumer<DatabaseException> {
1197 CallbackAdapter(final Runnable r) {
1201 public void accept(DatabaseException e) {
1203 e.printStackTrace();
1210 double p = clusterTable.getLoadProbability();
1211 // System.out.print("Load cluster " + clusterId + " with probability " + p +
1213 final ClusterI proxy = clusterSupport.getClusterByClusterId(clusterId);
1214 if (!proxy.isLoaded()) {
1216 if (Math.random() < p) {
1217 proxy.load(new CallbackAdapter(r));
1228 public int getBuiltin(String uri) {
1229 return builtinSupport.getBuiltin(uri);
1233 public void checkTasks() {
1234 System.out.println(syncThreads.toString());
1238 // public void asyncWrite(Write request) {
1240 //// if(plainWrite(writer) && sameProvider(request)) {
1241 //// writer.writeSupport.pushRequest(request);
1243 // asyncRequest(request);
1254 private void doGetStatements(ReadGraphImpl graph, final ClusterI cluster, final int subject, final DirectStatementsImpl result) {
1256 final class Proc implements ClusterI.PredicateProcedure<Object> {
1259 public boolean execute(Object context, final int predicate, final int objectIndex) {
1261 doExecute(null, predicate, objectIndex);
1266 private void doExecute(Object context, final int predicate, final int objectIndex) {
1269 cluster.forObjects(subject, predicate, new ClusterI.ObjectProcedure<Object>() {
1272 public boolean execute(Object context, int object) {
1273 result.addStatement(predicate, object);
1277 }, null, clusterSupport);
1278 } catch (DatabaseException e) {
1279 e.printStackTrace();
1287 cluster.forPredicates(subject, new Proc(), null, clusterSupport);
1288 } catch (DatabaseException e) {
1289 e.printStackTrace();
1294 // private void getDirectObjects4(final int callerThread, final ClusterI cluster, final int subject, final int predicate, final QueryProcessor processor, final ReadGraphImpl graph, final ForEachObjectProcedure procedure) {
1296 // if(!cluster.isLoaded()) {
1298 // requestCluster(callerThread, cluster.getClusterId(), new Callback<Integer>() {
1301 // public void run(Integer i) {
1303 // processor.schedule(i, new SessionTask(callerThread) {
1306 // public void run(int thread) {
1308 // getDirectObjects4(thread, cluster, subject, predicate, processor, graph, procedure);
1321 // cluster.forObjects(graph, subject, predicate, procedure);
1322 // } catch (DatabaseException e) {
1323 // e.printStackTrace();
1326 //// procedure.finished(graph);
1327 //// graph.state.barrier.dec();
1329 //// System.err.println("ai2=" + ai2.decrementAndGet());
1336 // AtomicInteger ai2 =new AtomicInteger(0);
1338 // private boolean testCluster(int subject, ClusterI proxy) {
1340 // if (proxy == null)
1341 // System.out.println("null cluster: " + Integer.toString(subject, 16));
1343 // return proxy != null;
1347 long getCluster(int id) {
1349 if(id < 0) return 0;
1350 ClusterI proxy = clusterTable.getClusterByResourceKey(id);
1351 if(proxy == null) return 0;
1352 else return proxy.getClusterId();
1356 public int getRandomAccessReference(String id) throws ResourceNotFoundException {
1359 Resource res = serializationSupport.getResourceSerializer().getResource(id);
1360 if(res == null) return 0;
1361 else return ((ResourceImpl)res).id;
1362 } catch (InvalidResourceReferenceException e) {
1363 //e.printStackTrace();
1370 public void ensureLoaded(final ReadGraphImpl graph, final int subject, final int predicate) {
1374 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1378 final ClusterI cluster = clusterTable.checkedGetClusterByResourceKey(subject);
1380 if(cluster.isLoaded()) {
1382 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1384 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1392 //new Exception().printStackTrace();
1394 cluster.load(session.clusterTranslator, new Runnable() {
1399 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject, predicate)) {
1401 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, predicate, g -> {});
1419 public void ensureLoaded(final ReadGraphImpl graph, final int subject) {
1423 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1427 final ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1429 if(cluster.isLoaded()) {
1431 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1433 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1441 // System.err.println("cluster not loaded " + subject);
1442 //new Exception().printStackTrace();
1444 cluster.load(session.clusterTranslator, new Runnable() {
1449 if(cluster.hasVirtual() && virtualGraphServerSupport.virtuals.contains(subject) && !SessionImplSocket.areVirtualStatementsLoaded(virtualGraphServerSupport, subject)) {
1451 SessionImplSocket.loadVirtualStatements(virtualGraphServerSupport, graph, subject, g -> {});
1469 public boolean isLoaded(int subject) {
1470 ClusterI cluster = clusterTable.getClusterByResourceKey(subject);
1471 return cluster.isLoaded();
1475 public void ceased(int thread) {
1477 session.ceased(thread);
1482 public Object getLock() {
1484 return session.requestManager;
1489 public VirtualGraph getProvider(int subject, int predicate, int object) {
1492 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1493 // This persistent resource does not have virtual statements => must deny in persistent graph
1494 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1497 for(TransientGraph g : virtualGraphServerSupport.providers) {
1498 for (final int id : g.getObjects(subject, predicate)) {
1499 if(object == id) return g;
1503 // Nothing found from virtual graphs
1509 public VirtualGraph getProvider(int subject, int predicate) {
1512 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1513 // This persistent resource does not have virtual statements => must deny in persistent graph
1514 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1517 TransientGraph result = null;
1518 for(TransientGraph g : virtualGraphServerSupport.providers) {
1519 if(g.getObjects(subject, predicate).length > 0) {
1520 // Found multiple, return null;
1521 if(result != null) return null;
1530 public VirtualGraph getValueProvider(int subject) {
1533 ClusterImpl cluster = (ClusterImpl)clusterTable.getClusterByResourceKey(subject);
1534 // This persistent resource does not have virtual statements => must deny in persistent graph
1535 if(!cluster.hasVirtual() || !virtualGraphServerSupport.virtuals.contains(subject)) return null;
1538 TransientGraph result = null;
1539 for(TransientGraph g : virtualGraphServerSupport.providers) {
1540 if(g.getValue(subject) != null) {
1541 if(result != null) return null;
1550 public void exit(Throwable t) {
1554 private void analyseProblem(ClusterI cluster) {
1556 if(cluster instanceof ClusterSmall)
1558 ((ClusterSmall)cluster).check();
1559 } catch (DatabaseException e) {
1560 LOGGER.error("ClusterSmall.check failed", e);