package fi.vtt.simantics.procore.internal; import org.simantics.db.DirectStatements; import org.simantics.db.ReadGraph; import org.simantics.db.RelationInfo; import org.simantics.db.Resource; import org.simantics.db.exception.AssumptionException; import org.simantics.db.exception.DatabaseException; import org.simantics.db.exception.NoSingleResultException; import org.simantics.db.impl.ClusterI; import org.simantics.db.impl.ClusterI.ClusterTypeEnum; import org.simantics.db.impl.ForEachObjectContextProcedure; import org.simantics.db.impl.ForEachObjectProcedure; import org.simantics.db.impl.ForPossibleRelatedValueContextProcedure; import org.simantics.db.impl.ForPossibleRelatedValueProcedure; import org.simantics.db.impl.ResourceImpl; import org.simantics.db.impl.TransientGraph; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.procedure.SyncContextMultiProcedure; import org.simantics.db.procedure.SyncContextProcedure; import org.simantics.db.procedure.SyncMultiProcedure; import org.simantics.db.procedure.SyncProcedure; import org.simantics.db.procore.cluster.ClusterBig; import org.simantics.db.procore.cluster.ClusterImpl; import org.simantics.db.procore.cluster.ClusterSmall; import org.simantics.db.procore.cluster.ResourceTableSmall; import org.simantics.db.procore.cluster.ValueTableSmall; import org.simantics.db.service.DirectQuerySupport; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * Note that the direct value retrieval in this implementation only supports * String-type literals - nothing else! * * This implementation is mainly intended for optimizing database indexing * performance. */ public class DirectQuerySupportImpl implements DirectQuerySupport { private static final Logger LOGGER = LoggerFactory.getLogger(DirectQuerySupportImpl.class); final private SessionImplSocket session; DirectQuerySupportImpl(SessionImplSocket session) { this.session = session; } @Override final public DirectStatements getDirectPersistentStatements(ReadGraph graph, final Resource subject) { ReadGraphImpl impl = (ReadGraphImpl)graph; return impl.processor.getDirectStatements(impl, subject, true); } @Override final public DirectStatements getDirectStatements(ReadGraph graph, final Resource subject) { ReadGraphImpl impl = (ReadGraphImpl)graph; return impl.processor.getDirectStatements(impl, subject, false); } @Override public RelationInfo getRelationInfo(ReadGraph graph, Resource subject) throws DatabaseException { ReadGraphImpl impl = (ReadGraphImpl)graph; return impl.processor.getRelationInfo(impl, subject); } @Override public SyncMultiProcedure compileForEachObject(ReadGraph graph, final Resource relation, SyncMultiProcedure user) throws DatabaseException { RelationInfo info = getRelationInfo(graph, relation); final int predicateKey = ((ResourceImpl)relation).id; return new ForEachObjectProcedure(predicateKey, info, session.queryProvider2, user); } @Override public SyncContextMultiProcedure compileForEachObject(ReadGraph graph, final Resource relation, SyncContextMultiProcedure user) throws DatabaseException { RelationInfo info = getRelationInfo(graph, relation); final int predicateKey = ((ResourceImpl)relation).id; return new ForEachObjectContextProcedure(predicateKey, info, session.queryProvider2, user); } @Override public SyncProcedure compilePossibleRelatedValue(ReadGraph graph, final Resource relation, SyncProcedure user) throws DatabaseException { RelationInfo info = getRelationInfo(graph, relation); final int predicateKey = ((ResourceImpl)relation).id; return new ForPossibleRelatedValueProcedure(predicateKey, info, user); } @Override public SyncContextProcedure compilePossibleRelatedValue(ReadGraph graph, final Resource relation, SyncContextProcedure user) throws DatabaseException { RelationInfo info = getRelationInfo(graph, relation); final int predicateKey = ((ResourceImpl)relation).id; return new ForPossibleRelatedValueContextProcedure(predicateKey, info, user); } @Override public void forEachObjectCompiled(ReadGraph graph, Resource subject, final SyncMultiProcedure procedure) { assert(subject != null); final ForEachObjectProcedure proc = (ForEachObjectProcedure)procedure; // final RelationInfo info = proc.info; final ReadGraphImpl impl = (ReadGraphImpl)graph; final int subjectId = ((ResourceImpl)subject).id; // int callerThread = impl.callerThread; // int suggestSchedule = (subjectId>>16) & queryProvider2.THREAD_MASK; // impl.inc(); // if(callerThread == suggestSchedule) { // if(info.isFunctional) { // querySupport.getObjects4(impl, subjectId, proc); // } else { session.querySupport.getObjects4(impl, subjectId, proc); // } // } else { // // impl.state.barrier.inc(); // impl.state.barrier.dec(callerThread); // // queryProvider2.schedule(callerThread, new SessionTask(suggestSchedule) { // // @Override // public void run(int thread) { // // impl.state.barrier.inc(thread); // impl.state.barrier.dec(); // // if(info.isFunctional) { // querySupport.getObjects4(impl.newAsync(thread), subjectId, proc); // } else { // querySupport.getObjects4(impl.newAsync(thread), subjectId, proc); // } // // } // // @Override // public String toString() { // return "gaff8"; // } // // }); // // } } @Override public void forEachObjectCompiled(ReadGraph graph, Resource subject, C context, final SyncContextMultiProcedure procedure) { assert(subject != null); final ForEachObjectContextProcedure proc = (ForEachObjectContextProcedure)procedure; final RelationInfo info = proc.info; final ReadGraphImpl impl = (ReadGraphImpl)graph; final int subjectId = ((ResourceImpl)subject).id; // int callerThread = impl.callerThread; // int suggestSchedule = (subjectId>>16) & queryProvider2.THREAD_MASK; // impl.inc(); if(info.isFunctional) { session.querySupport.getObjects4(impl, subjectId, context, proc); } else { session.querySupport.getObjects4(impl, subjectId, context, proc); } } @Override public void forPossibleRelatedValueCompiled(ReadGraph graph, Resource subject, final SyncProcedure procedure) { assert(subject != null); final ForPossibleRelatedValueProcedure proc = (ForPossibleRelatedValueProcedure)procedure; final ReadGraphImpl impl = (ReadGraphImpl)graph; final int subjectId = ((ResourceImpl)subject).id; // int callerThread = impl.callerThread; // int suggestSchedule = (subjectId>>16) & session.queryProvider2.THREAD_MASK; // impl.inc(); // if(callerThread == suggestSchedule) { // if(proc.info.isFunctional) { try { T result = getRelatedValue4(impl, subjectId, proc); try { proc.execute(graph, result); } catch (DatabaseException e2) { LOGGER.error("Unexpected exception while handling related value", e2); } } catch (DatabaseException e) { try { proc.exception(graph, e); } catch (DatabaseException e2) { LOGGER.error("Unexpected exception while handling related value", e2); } } // } else { // getRelatedValue4(impl, subjectId, proc); // } // } else { // // impl.state.barrier.inc(); // impl.state.barrier.dec(callerThread); // // queryProvider2.schedule(callerThread, new SessionTask(suggestSchedule) { // // @Override // public void run(int thread) { // // impl.state.barrier.inc(thread); // impl.state.barrier.dec(); // // if(info.isFunctional) { // getRelatedValue4(impl.newAsync(thread), subjectId, proc); // } else { // getRelatedValue4(impl.newAsync(thread), subjectId, proc); // } // // } // // @Override // public String toString() { // return "gaff11"; // } // // }); // // } } @Override public void forPossibleRelatedValueCompiled(ReadGraph graph, Resource subject, C context, final SyncContextProcedure procedure) { assert(subject != null); final ForPossibleRelatedValueContextProcedure proc = (ForPossibleRelatedValueContextProcedure)procedure; final ReadGraphImpl impl = (ReadGraphImpl)graph; final int subjectId = ((ResourceImpl)subject).id; // int callerThread = impl.callerThread; // int suggestSchedule = (subjectId>>16) & session.queryProvider2.THREAD_MASK; // impl.inc(); // if(proc.info.isFunctional) { // } else { // getRelatedValue4(impl, subjectId, context, proc); // } try { T result = getRelatedValue4(impl, subjectId, context, proc); proc.execute(graph, context, result); } catch (DatabaseException e) { proc.execute(graph, context, null); } } /* @Override public void forPossibleType(final AsyncReadGraph graph, Resource subject, final AsyncProcedure procedure) { assert(subject != null); final ReadGraphImpl impl = (ReadGraphImpl)graph; final int subjectId = ((ResourceImpl)subject).id; try { final ClusterI cluster = session.clusterTable.getClusterByResourceKey(subjectId); if(!cluster.isLoaded()) { // impl.state.inc(0); session.queryProvider2.requestCluster(impl, cluster.getClusterId(), new Runnable() { @Override public void run() { try { int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator); procedure.execute(graph, new ResourceImpl(session.resourceSupport, result)); // impl.state.dec(0); } catch (DatabaseException e) { e.printStackTrace(); } } }); } else { int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator); procedure.execute(graph, new ResourceImpl(session.resourceSupport, result)); } } catch (DatabaseException e) { e.printStackTrace(); } } */ @Override public void forPossibleDirectType(final ReadGraph graph, Resource subject, final C context, final SyncContextProcedure procedure) { assert(subject != null); final ReadGraphImpl impl = (ReadGraphImpl)graph; final int subjectId = ((ResourceImpl)subject).id; try { final ClusterI cluster = session.clusterTable.getClusterByResourceKey(subjectId); if(!cluster.isLoaded()) { // impl.state.inc(0); session.queryProvider2.requestCluster(impl, cluster.getClusterId(), new Runnable() { @Override public void run() { try { ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subjectId, session.clusterTranslator); if(ClusterI.CompleteTypeEnum.InstanceOf == type) { int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator); procedure.execute(graph, context, new ResourceImpl(session.resourceSupport, result)); } else { procedure.execute(graph, context, null); } // impl.state.dec(0); } catch (DatabaseException e) { LOGGER.error("forPossibleDirectType requestCluster callback failed", e); } } }); } else { ClusterI.CompleteTypeEnum type = cluster.getCompleteType(subjectId, session.clusterTranslator); if(ClusterI.CompleteTypeEnum.InstanceOf == type) { int result = cluster.getCompleteObjectKey(subjectId, session.clusterTranslator); procedure.execute(graph, context, new ResourceImpl(session.resourceSupport, result)); } else { procedure.execute(graph, context, null); } } } catch (DatabaseException e) { procedure.execute(graph, context, null); } catch (Throwable t) { LOGGER.error("forPossibleDirectType failed unexpectedly", t); procedure.execute(graph, context, null); } } private T getRelatedValue4(final ReadGraphImpl graph, final int subject, final C context, final ForPossibleRelatedValueContextProcedure procedure) throws DatabaseException { int result = 0; final int predicate = procedure.predicateKey; if(subject < 0) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {}); return getRelatedValue4(graph, subject, context, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { for (int id : g.getObjects(subject, predicate)) { if(result != 0) { throw new DatabaseException("Multiple objects"); } else { result = id; } } } if(result == 0) { throw new DatabaseException("No objects for " + subject ); } else { return getValue4(graph, null, result, context, procedure); } } final org.simantics.db.procore.cluster.ClusterImpl cluster = session.clusterTable.getClusterByResourceKey(subject); if(!cluster.isLoaded()) { cluster.load(); return getRelatedValue4(graph, subject, context, procedure); } if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {}); return getRelatedValue4(graph, subject, context, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { for (int id : g.getObjects(subject, predicate)) { if(result != 0) { throw new DatabaseException("Multiple objects"); } else { result = id; } } } return getRelatedDirectValue4(graph, cluster, subject, result, context, procedure); } else { return getRelatedDirectValue4(graph, cluster, subject, 0, context, procedure); } } @SuppressWarnings("unchecked") private T getValue4(final ReadGraphImpl graph, final ClusterImpl containerCluster, final int subject, final ForPossibleRelatedValueProcedure procedure) throws DatabaseException { Object result = null; if(subject < 0) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {}); return getValue4(graph, containerCluster, subject, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { Object value = g.getValue(subject); if(value != null) { if(result != null) { throw new DatabaseException("Multiple values"); } else { result = value; } } } // FIXME: throw something here instead return (T)"name"; } ClusterImpl cluster = containerCluster; if(!containerCluster.contains(subject)) { cluster = session.clusterTable.getClusterByResourceKey(subject); if(!cluster.isLoaded()) { cluster.load(); return getValue4(graph, containerCluster, subject, procedure); } } if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {}); return getValue4(graph, containerCluster, subject, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { Object value = g.getValue(subject); if(value != null) { if(result != null) { throw new DatabaseException("Multiple values"); } else { result = value; } } } if(result != null) { return (T)result; } else { if(ClusterTypeEnum.SMALL == cluster.getType()) return getDirectValue4(graph, (ClusterSmall)cluster, subject); else return getDirectValue4(graph, (ClusterBig)cluster, subject); } } else { if(ClusterTypeEnum.SMALL == cluster.getType()) return getDirectValue4(graph, (ClusterSmall)cluster, subject); else return getDirectValue4(graph, (ClusterBig)cluster, subject); } } @SuppressWarnings("unchecked") private T getValue4(final ReadGraphImpl graph, final ClusterImpl containerCluster, final int subject, final C context, final ForPossibleRelatedValueContextProcedure procedure) throws DatabaseException { Object result = null; if(subject < 0) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {}); return getValue4(graph, containerCluster, subject, context, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { Object value = g.getValue(subject); if(value != null) { if(result != null) { throw new DatabaseException("Multiple values"); } else { result = value; } } } // FIXME: throw something here instead return (T)"name"; } ClusterImpl cluster = containerCluster; if(!containerCluster.contains(subject)) { cluster = session.clusterTable.getClusterByResourceKey(subject); if(!cluster.isLoaded()) { cluster.load(); return getValue4(graph, containerCluster, subject, context, procedure); } } if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, g -> {}); return getValue4(graph, containerCluster, subject, context, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { Object value = g.getValue(subject); if(value != null) { if(result != null) { throw new DatabaseException("Multiple values"); } else { result = value; } } } if(result != null) { return (T)result; } else { if(ClusterTypeEnum.SMALL == cluster.getType()) return getDirectValue4(graph, (ClusterSmall)cluster, subject); else return getDirectValue4(graph, (ClusterBig)cluster, subject); } } else { if(ClusterTypeEnum.SMALL == cluster.getType()) return getDirectValue4(graph, (ClusterSmall)cluster, subject); else return getDirectValue4(graph, (ClusterBig)cluster, subject); } } private T getRelatedDirectValue4(final ReadGraphImpl graph, final ClusterImpl cluster, final int subject, final int result, final ForPossibleRelatedValueProcedure procedure) throws DatabaseException { int so = cluster.getSingleObject(subject, procedure, session.clusterTranslator); if(so == 0) { if(result == 0) { throw new DatabaseException("No objects " + subject + " " + procedure.predicateKey); } else { return getValue4(graph, cluster, result, procedure); } } else { if(result == 0) { return getValue4(graph, cluster, so, procedure); } else { throw new DatabaseException("Multiple objects"); } } } private T getRelatedDirectValue4(final ReadGraphImpl graph, final ClusterImpl cluster, final int subject, final int result, final C context, final ForPossibleRelatedValueContextProcedure procedure) throws DatabaseException { int so = cluster.getSingleObject(subject, procedure, session.clusterTranslator); if(so == 0) { if(result == 0) { throw new NoSingleResultException("No objects " + subject + " " + procedure.predicateKey, result); } else { return getValue4(graph, cluster, result, context, procedure); } } else { if(result == 0) { return getValue4(graph, cluster, so, context, procedure); } else { throw new NoSingleResultException("Multiple objects for " + subject + " " + procedure.predicateKey, result); } } } public T getRelatedValue4(final ReadGraphImpl graph, final int subject, final ForPossibleRelatedValueProcedure procedure) throws DatabaseException { int result = 0; final int predicate = procedure.predicateKey; if(subject < 0) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {}); return getRelatedValue4(graph, subject, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { for (int id : g.getObjects(subject, predicate)) { if(result != 0) { throw new DatabaseException("Multiple objects"); } else { result = id; } } } if(result == 0) { throw new DatabaseException("No objects for " + subject ); } else { return getValue4(graph, null, result, procedure); } } final org.simantics.db.procore.cluster.ClusterImpl cluster = session.clusterTable.getClusterByResourceKey(subject); if(!cluster.isLoaded()) { cluster.load(); return getRelatedValue4(graph, subject, procedure); } if(cluster.hasVirtual() && session.virtualGraphServerSupport.virtuals.contains(subject)) { if(!SessionImplSocket.areVirtualStatementsLoaded(session.virtualGraphServerSupport, subject, predicate)) { SessionImplSocket.loadVirtualStatements(session.virtualGraphServerSupport, graph, subject, predicate, g -> {}); return getRelatedValue4(graph, subject, procedure); } for(TransientGraph g : session.virtualGraphServerSupport.providers) { for (int id : g.getObjects(subject, predicate)) { if(result != 0) { throw new DatabaseException("Multiple objects"); } else { result = id; } } } return getRelatedDirectValue4(graph, cluster, subject, result, procedure); } else { return getRelatedDirectValue4(graph, cluster, subject, 0, procedure); } } /* private void getDirectValue4(final ReadGraphImpl graph, final ClusterSmall cluster, final int subject, final C context, final ForPossibleRelatedValueContextProcedure procedure) { try { byte[] bytes = cluster.getValue(subject, session.clusterTranslator); T value = (T)utf(bytes); procedure.execute(graph, context, value); } catch (DatabaseException e) { procedure.execute(graph, context, null); } // graph.dec(); } */ @SuppressWarnings("unchecked") private T getDirectValue4(final ReadGraphImpl graph, final ClusterBig cluster, final int subject) throws DatabaseException { byte[] bytes = cluster.getValue(subject, session.clusterTranslator); return (T) utf(bytes, 0); } @SuppressWarnings("unchecked") private T getDirectValue4(final ReadGraphImpl graph, final ClusterSmall cluster, final int subject) throws DatabaseException { // Note: this code avoids creating an intermediate byte[] // to store the encoded string bytes and reads the UTF string // from the value table byte[] directly into String instead. ResourceTableSmall rt = cluster.resourceTable; ValueTableSmall vt = cluster.valueTable; byte[] bs = vt.table; long[] ls = rt.table; int index = ((subject&0xFFF) << 1) - 1 + rt.offset; int valueIndex = ((int)(ls[index] >>> 24) & 0x3FFFFF) + vt.offset; int size = bs[valueIndex++]; if (size < 0) // two byte size size = (int)(((size & 0x7F) << 8) | (bs[valueIndex++] & 0xFF)); if (size <= 0) throw new DatabaseException("No value for " + subject); return (T) utf(bs, valueIndex); } private static final String utf(byte[] bytes, int offset) throws AssumptionException { if(bytes == null) return null; // Read databoard int32 using Length encoding // https://dev.simantics.org/index.php/Databoard_Specification#Length int index = offset; int length = bytes[index++]&0xff; if(length >= 0x80) { if(length >= 0xc0) { if(length >= 0xe0) { if(length >= 0xf0) { length &= 0x0f; length += ((bytes[index++]&0xff)<<3); length += ((bytes[index++]&0xff)<<11); length += ((bytes[index++]&0xff)<<19); length += 0x10204080; } else { length &= 0x1f; length += ((bytes[index++]&0xff)<<4); length += ((bytes[index++]&0xff)<<12); length += ((bytes[index++]&0xff)<<20); length += 0x204080; } } else { length &= 0x3f; length += ((bytes[index++]&0xff)<<5); length += ((bytes[index++]&0xff)<<13); length += 0x4080; } } else { length &= 0x7f; length += ((bytes[index++]&0xff)<<6); length += 0x80; } } // Copied from DataInputStream int utflen = length; char[] chararr = new char[utflen]; int c, char2, char3; int count = index; int target = index + length; int chararr_count=0; while (count < target) { c = (int) bytes[count] & 0xff; if (c > 127) break; count++; chararr[chararr_count++]=(char)c; } while (count < target) { c = (int) bytes[count] & 0xff; switch (c >> 4) { case 0: case 1: case 2: case 3: case 4: case 5: case 6: case 7: /* 0xxxxxxx*/ count++; chararr[chararr_count++]=(char)c; break; case 12: case 13: /* 110x xxxx 10xx xxxx*/ count += 2; if (count > target) throw new AssumptionException( "malformed input: partial character at end (" + (count-index) + " > " + utflen + ")"); char2 = (int) bytes[count-1]; if ((char2 & 0xC0) != 0x80) throw new AssumptionException( "malformed input around byte " + count); chararr[chararr_count++]=(char)(((c & 0x1F) << 6) | (char2 & 0x3F)); break; case 14: /* 1110 xxxx 10xx xxxx 10xx xxxx */ count += 3; if (count > target) throw new AssumptionException( "malformed input: partial character at end (" + (count-index) + " > " + utflen + ")"); char2 = (int) bytes[count-2]; char3 = (int) bytes[count-1]; if (((char2 & 0xC0) != 0x80) || ((char3 & 0xC0) != 0x80)) throw new AssumptionException( "malformed input around byte " + (count-1)); chararr[chararr_count++]=(char)(((c & 0x0F) << 12) | ((char2 & 0x3F) << 6) | ((char3 & 0x3F) << 0)); break; default: /* 10xx xxxx, 1111 xxxx */ throw new AssumptionException( "malformed input around byte " + count); } } // The number of chars produced may be less than utflen return new String(chararr, 0, chararr_count); } }