1 package fi.vtt.simantics.procore.internal;
3 import org.simantics.db.DirectStatements;
4 import org.simantics.db.ReadGraph;
5 import org.simantics.db.RelationInfo;
6 import org.simantics.db.Resource;
7 import org.simantics.db.exception.AssumptionException;
8 import org.simantics.db.exception.DatabaseException;
9 import org.simantics.db.exception.NoSingleResultException;
10 import org.simantics.db.impl.ClusterI;
11 import org.simantics.db.impl.ClusterI.ClusterTypeEnum;
12 import org.simantics.db.impl.ForEachObjectContextProcedure;
13 import org.simantics.db.impl.ForEachObjectProcedure;
14 import org.simantics.db.impl.ForPossibleRelatedValueContextProcedure;
15 import org.simantics.db.impl.ForPossibleRelatedValueProcedure;
16 import org.simantics.db.impl.ResourceImpl;
17 import org.simantics.db.impl.TransientGraph;
18 import org.simantics.db.impl.graph.ReadGraphImpl;
19 import org.simantics.db.procedure.SyncContextMultiProcedure;
20 import org.simantics.db.procedure.SyncContextProcedure;
21 import org.simantics.db.procedure.SyncMultiProcedure;
22 import org.simantics.db.procedure.SyncProcedure;
23 import org.simantics.db.procore.cluster.ClusterBig;
24 import org.simantics.db.procore.cluster.ClusterImpl;
25 import org.simantics.db.procore.cluster.ClusterSmall;
26 import org.simantics.db.procore.cluster.ResourceTableSmall;
27 import org.simantics.db.procore.cluster.ValueTableSmall;
28 import org.simantics.db.service.DirectQuerySupport;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
33 * Note that the direct value retrieval in this implementation only supports
34 * String-type literals - nothing else!
36 * This implementation is mainly intended for optimizing database indexing
39 public class DirectQuerySupportImpl implements DirectQuerySupport {
41 private static final Logger LOGGER = LoggerFactory.getLogger(DirectQuerySupportImpl.class);
43 final private SessionImplSocket session;
45 DirectQuerySupportImpl(SessionImplSocket session) {
46 this.session = session;
50 final public DirectStatements getDirectPersistentStatements(ReadGraph graph, final Resource subject) {
51 ReadGraphImpl impl = (ReadGraphImpl)graph;
52 return impl.processor.getDirectStatements(impl, subject, true);
56 final public DirectStatements getDirectStatements(ReadGraph graph, final Resource subject) {
57 ReadGraphImpl impl = (ReadGraphImpl)graph;
58 return impl.processor.getDirectStatements(impl, subject, false);
62 public RelationInfo getRelationInfo(ReadGraph graph, Resource subject) throws DatabaseException {
63 ReadGraphImpl impl = (ReadGraphImpl)graph;
64 return impl.processor.getRelationInfo(impl, subject);
68 public SyncMultiProcedure<Resource> compileForEachObject(ReadGraph graph, final Resource relation, SyncMultiProcedure<Resource> user) throws DatabaseException {
70 RelationInfo info = getRelationInfo(graph, relation);
71 final int predicateKey = ((ResourceImpl)relation).id;
72 return new ForEachObjectProcedure(predicateKey, info, session.queryProvider2, user);
77 public <C> SyncContextMultiProcedure<C, Resource> compileForEachObject(ReadGraph graph, final Resource relation, SyncContextMultiProcedure<C, Resource> user) throws DatabaseException {
79 RelationInfo info = getRelationInfo(graph, relation);
80 final int predicateKey = ((ResourceImpl)relation).id;
81 return new ForEachObjectContextProcedure<C>(predicateKey, info, session.queryProvider2, user);
86 public <T> SyncProcedure<T> compilePossibleRelatedValue(ReadGraph graph, final Resource relation, SyncProcedure<T> user) throws DatabaseException {
88 RelationInfo info = getRelationInfo(graph, relation);
89 final int predicateKey = ((ResourceImpl)relation).id;
90 return new ForPossibleRelatedValueProcedure<T>(predicateKey, info, user);
95 public <C, T> SyncContextProcedure<C, T> compilePossibleRelatedValue(ReadGraph graph, final Resource relation, SyncContextProcedure<C, T> user) throws DatabaseException {
97 RelationInfo info = getRelationInfo(graph, relation);
98 final int predicateKey = ((ResourceImpl)relation).id;
99 return new ForPossibleRelatedValueContextProcedure<C, T>(predicateKey, info, user);
104 public void forEachObjectCompiled(ReadGraph graph, Resource subject, final SyncMultiProcedure<Resource> procedure) {
106 assert(subject != null);
108 final ForEachObjectProcedure proc = (ForEachObjectProcedure)procedure;
109 // final RelationInfo info = proc.info;
111 final ReadGraphImpl impl = (ReadGraphImpl)graph;
112 final int subjectId = ((ResourceImpl)subject).id;
114 // int callerThread = impl.callerThread;
115 // int suggestSchedule = (subjectId>>16) & queryProvider2.THREAD_MASK;
119 // if(callerThread == suggestSchedule) {
121 // if(info.isFunctional) {
122 // querySupport.getObjects4(impl, subjectId, proc);
124 session.querySupport.getObjects4(impl, subjectId, proc);
129 // impl.state.barrier.inc();
130 // impl.state.barrier.dec(callerThread);
132 // queryProvider2.schedule(callerThread, new SessionTask(suggestSchedule) {
135 // public void run(int thread) {
137 // impl.state.barrier.inc(thread);
138 // impl.state.barrier.dec();
140 // if(info.isFunctional) {
141 // querySupport.getObjects4(impl.newAsync(thread), subjectId, proc);
143 // querySupport.getObjects4(impl.newAsync(thread), subjectId, proc);
149 // public String toString() {
160 public <C> void forEachObjectCompiled(ReadGraph graph, Resource subject, C context, final SyncContextMultiProcedure<C, Resource> procedure) {
162 assert(subject != null);
164 final ForEachObjectContextProcedure<C> proc = (ForEachObjectContextProcedure<C>)procedure;
165 final RelationInfo info = proc.info;
167 final ReadGraphImpl impl = (ReadGraphImpl)graph;
168 final int subjectId = ((ResourceImpl)subject).id;
170 // int callerThread = impl.callerThread;
171 // int suggestSchedule = (subjectId>>16) & queryProvider2.THREAD_MASK;
175 if(info.isFunctional) {
176 session.querySupport.getObjects4(impl, subjectId, context, proc);
178 session.querySupport.getObjects4(impl, subjectId, context, proc);
184 public <T> void forPossibleRelatedValueCompiled(ReadGraph graph, Resource subject, final SyncProcedure<T> procedure) {
186 assert(subject != null);
188 final ForPossibleRelatedValueProcedure<T> proc = (ForPossibleRelatedValueProcedure<T>)procedure;
190 final ReadGraphImpl impl = (ReadGraphImpl)graph;
191 final int subjectId = ((ResourceImpl)subject).id;
193 // int callerThread = impl.callerThread;
194 // int suggestSchedule = (subjectId>>16) & session.queryProvider2.THREAD_MASK;
198 // if(callerThread == suggestSchedule) {
200 // if(proc.info.isFunctional) {
202 T result = getRelatedValue4(impl, subjectId, proc);
204 proc.execute(graph, result);
205 } catch (DatabaseException e2) {
206 LOGGER.error("Unexpected exception while handling related value", e2);
208 } catch (DatabaseException e) {
210 proc.exception(graph, e);
211 } catch (DatabaseException e2) {
212 LOGGER.error("Unexpected exception while handling related value", e2);
216 // getRelatedValue4(impl, subjectId, proc);
221 // impl.state.barrier.inc();
222 // impl.state.barrier.dec(callerThread);
224 // queryProvider2.schedule(callerThread, new SessionTask(suggestSchedule) {
227 // public void run(int thread) {
229 // impl.state.barrier.inc(thread);
230 // impl.state.barrier.dec();
232 // if(info.isFunctional) {
233 // getRelatedValue4(impl.newAsync(thread), subjectId, proc);
235 // getRelatedValue4(impl.newAsync(thread), subjectId, proc);
241 // public String toString() {
252 public <C, T> void forPossibleRelatedValueCompiled(ReadGraph graph, Resource subject, C context, final SyncContextProcedure<C, T> procedure) {
254 assert(subject != null);
256 final ForPossibleRelatedValueContextProcedure<C, T> proc = (ForPossibleRelatedValueContextProcedure<C, T>)procedure;
258 final ReadGraphImpl impl = (ReadGraphImpl)graph;
259 final int subjectId = ((ResourceImpl)subject).id;
261 // int callerThread = impl.callerThread;
262 // int suggestSchedule = (subjectId>>16) & session.queryProvider2.THREAD_MASK;
266 // if(proc.info.isFunctional) {
268 // getRelatedValue4(impl, subjectId, context, proc);
272 T result = getRelatedValue4(impl, subjectId, context, proc);
273 proc.execute(graph, context, result);
274 } catch (DatabaseException e) {
275 proc.execute(graph, context, null);
281 public <T> void forPossibleType(final AsyncReadGraph graph, Resource subject, final AsyncProcedure<Resource> procedure) {
283 assert(subject != null);
285 final ReadGraphImpl impl = (ReadGraphImpl)graph;
286 final int subjectId = ((ResourceImpl)subject).id;
290 final ClusterI cluster = session.clusterTable.getClusterByResourceKey(subjectId);
291 if(!cluster.isLoaded()) {
293 // impl.state.inc(0);
295 session.queryProvider2.requestCluster(impl, cluster.getClusterId(), new Runnable() {
302 int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator);
303 procedure.execute(graph, new ResourceImpl(session.resourceSupport, result));
305 // impl.state.dec(0);
307 } catch (DatabaseException e) {
317 int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator);
318 procedure.execute(graph, new ResourceImpl(session.resourceSupport, result));
322 } catch (DatabaseException e) {
333 public <C> void forPossibleDirectType(final ReadGraph graph, Resource subject, final C context, final SyncContextProcedure<C, Resource> procedure) {
335 assert(subject != null);
337 final ReadGraphImpl impl = (ReadGraphImpl)graph;
338 final int subjectId = ((ResourceImpl)subject).id;
342 final ClusterI cluster = session.clusterTable.getClusterByResourceKey(subjectId);
343 if(!cluster.isLoaded()) {
345 // impl.state.inc(0);
347 session.queryProvider2.requestCluster(impl, cluster.getClusterId(), new Runnable() {
354 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subjectId, session.clusterTranslator);
355 if(ClusterI.CompleteTypeEnum.InstanceOf == type) {
356 int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator);
357 procedure.execute(graph, context, new ResourceImpl(session.resourceSupport, result));
359 procedure.execute(graph, context, null);
362 // impl.state.dec(0);
364 } catch (DatabaseException e) {
365 LOGGER.error("forPossibleDirectType requestCluster callback failed", e);
374 ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subjectId, session.clusterTranslator);
375 if(ClusterI.CompleteTypeEnum.InstanceOf == type) {
376 int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator);
377 procedure.execute(graph, context, new ResourceImpl(session.resourceSupport, result));
379 procedure.execute(graph, context, null);
384 } catch (DatabaseException e) {
386 procedure.execute(graph, context, null);
388 } catch (Throwable t) {
390 LOGGER.error("forPossibleDirectType failed unexpectedly", t);
391 procedure.execute(graph, context, null);
397 private <C, T> T getRelatedValue4(final ReadGraphImpl graph, final int subject, final C context, final ForPossibleRelatedValueContextProcedure<C, T> procedure) throws DatabaseException {
401 final int predicate = procedure.predicateKey;
405 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) {
406 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {});
407 return getRelatedValue4(graph, subject, context, procedure);
410 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
411 for (int id : g.getObjects(subject, predicate)) {
413 throw new DatabaseException("Multiple objects");
421 throw new DatabaseException("No objects for " + subject );
423 return getValue4(graph, null, result, context, procedure);
428 final org.simantics.db.procore.cluster.ClusterImpl cluster = session.clusterTable.getClusterByResourceKey(subject);
429 if(!cluster.isLoaded()) {
431 return getRelatedValue4(graph, subject, context, procedure);
434 if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) {
436 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) {
437 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {});
438 return getRelatedValue4(graph, subject, context, procedure);
441 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
442 for (int id : g.getObjects(subject, predicate)) {
444 throw new DatabaseException("Multiple objects");
451 return getRelatedDirectValue4(graph, cluster, subject, result, context, procedure);
455 return getRelatedDirectValue4(graph, cluster, subject, 0, context, procedure);
461 @SuppressWarnings("unchecked")
462 private <T> T getValue4(final ReadGraphImpl graph, final ClusterImpl containerCluster, final int subject, final ForPossibleRelatedValueProcedure<T> procedure) throws DatabaseException {
464 Object result = null;
468 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) {
469 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {});
470 return getValue4(graph, containerCluster, subject, procedure);
473 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
474 Object value = g.getValue(subject);
477 throw new DatabaseException("Multiple values");
484 // FIXME: throw something here instead
489 ClusterImpl cluster = containerCluster;
490 if(!containerCluster.contains(subject)) {
491 cluster = session.clusterTable.getClusterByResourceKey(subject);
492 if(!cluster.isLoaded()) {
494 return getValue4(graph, containerCluster, subject, procedure);
498 if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) {
500 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) {
501 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {});
502 return getValue4(graph, containerCluster, subject, procedure);
505 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
506 Object value = g.getValue(subject);
509 throw new DatabaseException("Multiple values");
519 if(ClusterTypeEnum.SMALL == cluster.getType())
520 return getDirectValue4(graph, (ClusterSmall)cluster, subject);
522 return getDirectValue4(graph, (ClusterBig)cluster, subject);
527 if(ClusterTypeEnum.SMALL == cluster.getType())
528 return getDirectValue4(graph, (ClusterSmall)cluster, subject);
530 return getDirectValue4(graph, (ClusterBig)cluster, subject);
536 @SuppressWarnings("unchecked")
537 private <C, T> T getValue4(final ReadGraphImpl graph, final ClusterImpl containerCluster, final int subject, final C context, final ForPossibleRelatedValueContextProcedure<C, T> procedure) throws DatabaseException {
539 Object result = null;
543 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) {
544 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {});
545 return getValue4(graph, containerCluster, subject, context, procedure);
548 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
549 Object value = g.getValue(subject);
552 throw new DatabaseException("Multiple values");
559 // FIXME: throw something here instead
564 ClusterImpl cluster = containerCluster;
565 if(!containerCluster.contains(subject)) {
566 cluster = session.clusterTable.getClusterByResourceKey(subject);
567 if(!cluster.isLoaded()) {
569 return getValue4(graph, containerCluster, subject, context, procedure);
573 if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) {
575 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) {
576 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {});
577 return getValue4(graph, containerCluster, subject, context, procedure);
580 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
581 Object value = g.getValue(subject);
584 throw new DatabaseException("Multiple values");
594 if(ClusterTypeEnum.SMALL == cluster.getType())
595 return getDirectValue4(graph, (ClusterSmall)cluster, subject);
597 return getDirectValue4(graph, (ClusterBig)cluster, subject);
602 if(ClusterTypeEnum.SMALL == cluster.getType())
603 return getDirectValue4(graph, (ClusterSmall)cluster, subject);
605 return getDirectValue4(graph, (ClusterBig)cluster, subject);
611 private <T> T getRelatedDirectValue4(final ReadGraphImpl graph, final ClusterImpl cluster, final int subject, final int result, final ForPossibleRelatedValueProcedure<T> procedure) throws DatabaseException {
613 int so = cluster.getSingleObject(subject, procedure, session.clusterTranslator);
616 throw new DatabaseException("No objects " + subject + " " + procedure.predicateKey);
618 return getValue4(graph, cluster, result, procedure);
622 return getValue4(graph, cluster, so, procedure);
624 throw new DatabaseException("Multiple objects");
630 private <C, T> T getRelatedDirectValue4(final ReadGraphImpl graph, final ClusterImpl cluster, final int subject, final int result, final C context, final ForPossibleRelatedValueContextProcedure<C, T> procedure) throws DatabaseException {
632 int so = cluster.getSingleObject(subject, procedure, session.clusterTranslator);
635 throw new NoSingleResultException("No objects " + subject + " " + procedure.predicateKey, result);
637 return getValue4(graph, cluster, result, context, procedure);
641 return getValue4(graph, cluster, so, context, procedure);
643 throw new NoSingleResultException("Multiple objects for " + subject + " " + procedure.predicateKey, result);
649 public <T> T getRelatedValue4(final ReadGraphImpl graph, final int subject, final ForPossibleRelatedValueProcedure<T> procedure) throws DatabaseException {
653 final int predicate = procedure.predicateKey;
657 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) {
658 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {});
659 return getRelatedValue4(graph, subject, procedure);
662 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
663 for (int id : g.getObjects(subject, predicate)) {
665 throw new DatabaseException("Multiple objects");
673 throw new DatabaseException("No objects for " + subject );
675 return getValue4(graph, null, result, procedure);
680 final org.simantics.db.procore.cluster.ClusterImpl cluster = session.clusterTable.getClusterByResourceKey(subject);
681 if(!cluster.isLoaded()) {
683 return getRelatedValue4(graph, subject, procedure);
686 if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) {
688 if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) {
689 SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {});
690 return getRelatedValue4(graph, subject, procedure);
693 for(TransientGraph g : session.virtualGraphServerSupport.providers) {
694 for (int id : g.getObjects(subject, predicate)) {
696 throw new DatabaseException("Multiple objects");
703 return getRelatedDirectValue4(graph, cluster, subject, result, procedure);
707 return getRelatedDirectValue4(graph, cluster, subject, 0, procedure);
714 private <C, T> void getDirectValue4(final ReadGraphImpl graph, final ClusterSmall cluster, final int subject, final C context, final ForPossibleRelatedValueContextProcedure<C, T> procedure) {
717 byte[] bytes = cluster.getValue(subject, session.clusterTranslator);
718 T value = (T)utf(bytes);
719 procedure.execute(graph, context, value);
720 } catch (DatabaseException e) {
721 procedure.execute(graph, context, null);
729 @SuppressWarnings("unchecked")
730 private <T> T getDirectValue4(final ReadGraphImpl graph, final ClusterBig cluster, final int subject) throws DatabaseException {
732 byte[] bytes = cluster.getValue(subject, session.clusterTranslator);
733 return (T) utf(bytes, 0);
737 @SuppressWarnings("unchecked")
738 private <T> T getDirectValue4(final ReadGraphImpl graph, final ClusterSmall cluster, final int subject) throws DatabaseException {
739 // Note: this code avoids creating an intermediate byte[]
740 // to store the encoded string bytes and reads the UTF string
741 // from the value table byte[] directly into String instead.
743 ResourceTableSmall rt = cluster.resourceTable;
744 ValueTableSmall vt = cluster.valueTable;
746 byte[] bs = vt.table;
747 long[] ls = rt.table;
749 int index = ((subject&0xFFF) << 1) - 1 + rt.offset;
750 int valueIndex = ((int)(ls[index] >>> 24) & 0x3FFFFF) + vt.offset;
752 int size = bs[valueIndex++];
753 if (size < 0) // two byte size
754 size = (int)(((size & 0x7F) << 8) | (bs[valueIndex++] & 0xFF));
756 throw new DatabaseException("No value for " + subject);
758 return (T) utf(bs, valueIndex);
761 private static final String utf(byte[] bytes, int offset) throws AssumptionException {
763 if(bytes == null) return null;
765 // Read databoard int32 using Length encoding
766 // https://dev.simantics.org/index.php/Databoard_Specification#Length
768 int length = bytes[index++]&0xff;
774 length += ((bytes[index++]&0xff)<<3);
775 length += ((bytes[index++]&0xff)<<11);
776 length += ((bytes[index++]&0xff)<<19);
777 length += 0x10204080;
781 length += ((bytes[index++]&0xff)<<4);
782 length += ((bytes[index++]&0xff)<<12);
783 length += ((bytes[index++]&0xff)<<20);
789 length += ((bytes[index++]&0xff)<<5);
790 length += ((bytes[index++]&0xff)<<13);
796 length += ((bytes[index++]&0xff)<<6);
801 // Copied from DataInputStream
803 char[] chararr = new char[utflen];
807 int target = index + length;
810 while (count < target) {
811 c = (int) bytes[count] & 0xff;
814 chararr[chararr_count++]=(char)c;
817 while (count < target) {
818 c = (int) bytes[count] & 0xff;
820 case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7:
823 chararr[chararr_count++]=(char)c;
826 /* 110x xxxx 10xx xxxx*/
829 throw new AssumptionException(
830 "malformed input: partial character at end (" + (count-index) + " > " + utflen + ")");
831 char2 = (int) bytes[count-1];
832 if ((char2 & 0xC0) != 0x80)
833 throw new AssumptionException(
834 "malformed input around byte " + count);
835 chararr[chararr_count++]=(char)(((c & 0x1F) << 6) |
839 /* 1110 xxxx 10xx xxxx 10xx xxxx */
842 throw new AssumptionException(
843 "malformed input: partial character at end (" + (count-index) + " > " + utflen + ")");
844 char2 = (int) bytes[count-2];
845 char3 = (int) bytes[count-1];
846 if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80))
847 throw new AssumptionException(
848 "malformed input around byte " + (count-1));
849 chararr[chararr_count++]=(char)(((c & 0x0F) << 12) |
850 ((char2 & 0x3F) << 6) |
851 ((char3 & 0x3F) << 0));
854 /* 10xx xxxx, 1111 xxxx */
855 throw new AssumptionException(
856 "malformed input around byte " + count);
860 // The number of chars produced may be less than utflen
861 return new String(chararr, 0, chararr_count);