X-Git-Url: https://gerrit.simantics.org/r/gitweb?a=blobdiff_plain;f=bundles%2Forg.simantics.db.impl%2Fsrc%2Forg%2Fsimantics%2Fdb%2Fimpl%2Fquery%2FQueryCacheBase.java;h=2d3c92d8242ed3259ea0518520262666cbabd423;hb=de8b6e194da6d56c410101d9c72763078b0e8717;hp=e2737f891385911bdb321139dcd7b93a45ff518c;hpb=1e957fc9da518f3bef8a2c19cad72772087e1b6a;p=simantics%2Fplatform.git diff --git a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java index e2737f891..2d3c92d82 100644 --- a/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java +++ b/bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryCacheBase.java @@ -2,19 +2,23 @@ package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import org.simantics.db.AsyncReadGraph; +import org.simantics.db.ObjectResourceIdMap; import org.simantics.db.RelationInfo; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.procedure.InternalProcedure; +import org.simantics.db.impl.query.QueryProcessor.SessionTask; import org.simantics.db.procedure.AsyncMultiProcedure; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.procedure.Listener; import org.simantics.db.procedure.ListenerBase; +import org.simantics.db.procedure.Procedure; import org.simantics.db.request.AsyncMultiRead; import org.simantics.db.request.AsyncRead; import org.simantics.db.request.ExternalRead; @@ -38,7 +42,8 @@ public class QueryCacheBase { public boolean collecting = false; final protected THashMap uRIToResourceMap; - final protected THashMap namespaceIndexMap; + //final protected THashMap namespaceIndexMap; + final protected UnaryQueryHashMap>> childMapMap; final protected DoubleKeyQueryHashMap objectsMap; final protected DoubleKeyQueryHashMap assertedStatementsMap; final protected DoubleKeyQueryHashMap directObjectsMap; @@ -79,7 +84,8 @@ public class QueryCacheBase { valueQueryMap = new UnaryQueryHashMap(); principalTypesMap = new UnaryQueryHashMap(); uRIToResourceMap = new THashMap(); - namespaceIndexMap = new THashMap(); + //namespaceIndexMap = new THashMap(); + childMapMap = new UnaryQueryHashMap>>(); relationInfoQueryMap = new UnaryQueryHashMap(); typeHierarchyMap = new UnaryQueryHashMap(); superTypesMap = new UnaryQueryHashMap(); @@ -197,9 +203,9 @@ public class QueryCacheBase { // Just for safety if(entry.isDiscarded()) return; - if(entry.isExcepted()) entry.setPending(); if(used.compareAndSet(false, true)) { + //entry.setPending(); entry.addOrSet(parentGraph.processor, result); procedure.execute(parentGraph, result); } else { @@ -373,12 +379,15 @@ public class QueryCacheBase { } - synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { + public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { if (parent != null && !inferred) { try { - if(!child.isImmutable(graph)) - child.addParent(parent); + if(!child.isImmutable(graph)) { + synchronized(child) { + child.addParent(parent); + } + } } catch (DatabaseException e) { Logger.defaultLogError(e); } @@ -393,7 +402,7 @@ public class QueryCacheBase { } - public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { + public synchronized ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { assert (entry != null); @@ -510,7 +519,8 @@ public class QueryCacheBase { realSize += directSuperRelationsMap.size(); realSize += principalTypesMap.size(); realSize += uRIToResourceMap.size(); - realSize += namespaceIndexMap.size(); + //realSize += namespaceIndexMap.size(); + realSize += childMapMap.size(); realSize += relationInfoQueryMap.size(); realSize += superTypesMap.size(); @@ -547,9 +557,11 @@ public class QueryCacheBase { for(CacheEntryBase e : uRIToResourceMap.values()) if(e.getLevel() <= level) result.add(e); - for(CacheEntryBase e : namespaceIndexMap.values()) - if(e.getLevel() <= level) - result.add(e); +// for(CacheEntryBase e : namespaceIndexMap.values()) +// if(e.getLevel() <= level) +// result.add(e); + + childMapMap.values(level, result); relationInfoQueryMap.values(level, result); superTypesMap.values(level, result); @@ -585,7 +597,8 @@ public class QueryCacheBase { entries.addAll(directSuperRelationsMap.values()); entries.addAll(principalTypesMap.values()); entries.addAll(uRIToResourceMap.values()); - entries.addAll(namespaceIndexMap.values()); + //entries.addAll(namespaceIndexMap.values()); + entries.addAll(childMapMap.values()); entries.addAll(relationInfoQueryMap.values()); entries.addAll(superTypesMap.values()); entries.addAll(superRelationsMap.values()); @@ -622,29 +635,36 @@ public class QueryCacheBase { } } - public static void waitPending(CacheEntry entry) throws DatabaseException { + public static void waitPending(QueryProcessor processor, CacheEntry entry) throws DatabaseException { int counter = 0; while(entry.isPending()) { try { - Thread.sleep(1); - counter++; - if(counter > 1000) { - CacheEntryBase base = ((CacheEntryBase)entry); -// if(base.created != null) { -// System.err.println("created:"); -// base.created.printStackTrace(); -// } -// if(base.performed != null) { -// System.err.println("performed:"); -// base.performed.printStackTrace(); -// } -// if(base.ready != null) { -// System.err.println("ready:"); -// base.ready.printStackTrace(); -// } - System.err.println("asd"); - //base.getQuery().recompute(null, null, entry); + SessionTask task = null;//processor.getOwnTask(processor.thread.get()); + if(task != null) { + task.run(processor.thread.get()); + } else { + Thread.sleep(1); + counter++; + if(counter > 5000) { + CacheEntryBase base = ((CacheEntryBase)entry); +// if(base.created != null) { +// System.err.println("created:"); +// base.created.printStackTrace(); +// } +// if(base.performed != null) { +// System.err.println("performed:"); +// base.performed.printStackTrace(); +// } +// if(base.ready != null) { +// System.err.println("ready:"); +// base.ready.printStackTrace(); +// } + new Exception("Timeout waiting for request to complete: " + entry.getOriginalRequest().toString()).printStackTrace(); + throw new DatabaseException("Timeout waiting for request to complete."); + //System.err.println("asd"); + //base.getQuery().recompute(null, null, entry); + } } } catch (InterruptedException e) { } @@ -796,6 +816,19 @@ public class QueryCacheBase { }; + protected static InternalProcedure> emptyChildMapProcedure = new InternalProcedure>() { + + @Override + public void execute(ReadGraphImpl graph, ObjectResourceIdMap i) { + } + + @Override + public void exception(ReadGraphImpl graph, Throwable throwable) { + } + + }; + + protected static IntProcedure emptyIntProcedure = new IntProcedure() { @@ -878,6 +911,7 @@ public class QueryCacheBase { protected static InternalProcedure emptyProcedureURIToResource = emptyIntegerProcedure; protected static InternalProcedure> emptyProcedureNamespaceIndex = emptyNamespaceProcedure; + protected static InternalProcedure> emptyProcedureChildMap = emptyChildMapProcedure; protected static InternalProcedure emptyProcedureRelationInfoQuery = emptyRelationInfoProcedure; protected static AsyncProcedure emptyProcedureReadEntry = emptyAsyncProcedure; @@ -891,6 +925,7 @@ public class QueryCacheBase { private AsyncProcedure procedure; private T result = null; private Throwable throwable = null; + private Semaphore s = new Semaphore(0); AsyncProcedureWrapper(AsyncProcedure procedure) { this.procedure = procedure; @@ -900,15 +935,22 @@ public class QueryCacheBase { public void execute(AsyncReadGraph graph, T result) { if(procedure != null) procedure.execute(graph, result); this.result = result; + s.release(); } @Override public void exception(AsyncReadGraph graph, Throwable throwable) { if(procedure != null) procedure.exception(graph, throwable); this.throwable = throwable; + s.release(); } public T get() throws DatabaseException { + try { + s.acquire(); + } catch (InterruptedException e) { + e.printStackTrace(); + } if(throwable != null) { if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; else throw new DatabaseException(throwable); @@ -919,6 +961,39 @@ public class QueryCacheBase { } + static class ExternalProcedureWrapper implements AsyncProcedure { + + private Procedure procedure; + private T result = null; + private Throwable throwable = null; + + ExternalProcedureWrapper(Procedure procedure) { + this.procedure = procedure; + } + + @Override + public void execute(AsyncReadGraph graph, T result) { + if(procedure != null) procedure.execute(result); + this.result = result; + } + + @Override + public void exception(AsyncReadGraph graph, Throwable throwable) { + if(procedure != null) procedure.exception(throwable); + this.throwable = throwable; + } + + public T get() throws DatabaseException { + if(throwable != null) { + if(throwable instanceof DatabaseException) throw (DatabaseException)throwable; + else throw new DatabaseException(throwable); + } else { + return result; + } + } + + } + static class InternalProcedureWrapper implements InternalProcedure { @@ -956,11 +1031,12 @@ public class QueryCacheBase { static class IntSetWrapper implements IntProcedure { private IntProcedure procedure; - private IntSet result = new IntSet(); + final private IntSet result; private Throwable throwable = null; - IntSetWrapper(IntProcedure procedure) { + IntSetWrapper(ReadGraphImpl graph, IntProcedure procedure) { this.procedure = procedure; + result = new IntSet(graph.processor.querySupport); } @Override @@ -1031,59 +1107,95 @@ public class QueryCacheBase { } + public static T resultExternalReadEntry(ReadGraphImpl graph, ExternalRead r, CacheEntry parent, ListenerBase listener, Procedure procedure) throws DatabaseException { + ExternalProcedureWrapper wrap = new ExternalProcedureWrapper<>(procedure); + QueryCache.runnerExternalReadEntry(graph, r, parent, listener, wrap); + return wrap.get(); + } + public static T resultReadEntry(ReadGraphImpl graph, Read r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { AsyncProcedureWrapper wrap = new AsyncProcedureWrapper<>(procedure); - QueryCache.runnerReadEntry(graph, r, parent, listener, wrap); + QueryCache.runnerReadEntry(graph, r, parent, listener, wrap, true); return wrap.get(); } - public static byte[] resultValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { - InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(procedure); + public static T resultAsyncReadEntry(ReadGraphImpl graph, AsyncRead r, CacheEntry parent, ListenerBase listener, AsyncProcedure procedure) throws DatabaseException { + AsyncProcedureWrapper wrap = new AsyncProcedureWrapper<>(procedure); + QueryCache.runnerAsyncReadEntry(graph, r, parent, listener, wrap, true); + return wrap.get(); + } + + public static byte[] resultValueQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(null); QueryCache.runnerValueQuery(graph, r, parent, listener, wrap); return wrap.get(); } - public static RelationInfo resultRelationInfoQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { - InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(procedure); + public static RelationInfo resultRelationInfoQuery(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(null); QueryCache.runnerRelationInfoQuery(graph, r, parent, listener, wrap); return wrap.get(); } - public static IntSet resultSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { - InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(procedure); + public static IntSet resultSuperRelations(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(null); QueryCache.runnerSuperRelations(graph, r, parent, listener, wrap); return wrap.get(); } - public static IntSet resultSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { - InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(procedure); + public static IntSet resultSuperTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(null); QueryCache.runnerSuperTypes(graph, r, parent, listener, wrap); return wrap.get(); } - public static IntSet resultTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { - InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(procedure); + public static IntSet resultTypes(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(null); QueryCache.runnerTypes(graph, r, parent, listener, wrap); return wrap.get(); } - public static IntSet resultPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { - InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(procedure); + public static IntSet resultPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(null); QueryCache.runnerPredicates(graph, r, parent, listener, wrap); return wrap.get(); } - public static IntSet resultDirectPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener, InternalProcedure procedure) throws DatabaseException { - InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(procedure); + public static IntSet resultDirectPredicates(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper<>(null); QueryCache.runnerDirectPredicates(graph, r, parent, listener, wrap); return wrap.get(); } - public static IntArray resultAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener, TripleIntProcedure procedure) throws DatabaseException { - TripleIntProcedureWrapper wrap = new TripleIntProcedureWrapper(procedure); + public static IntArray resultAssertedStatements(ReadGraphImpl graph, int r1, int r2, CacheEntry parent, ListenerBase listener) throws DatabaseException { + TripleIntProcedureWrapper wrap = new TripleIntProcedureWrapper(null); QueryCache.runnerAssertedStatements(graph, r1, r2, parent, listener, wrap); return wrap.get(); } + public static Integer resultURIToResource(ReadGraphImpl graph, String id, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper wrap = new InternalProcedureWrapper(null); + QueryCache.runnerURIToResource(graph, id, parent, listener, wrap); + return wrap.get(); + } + + public static ObjectResourceIdMap resultChildMap(ReadGraphImpl graph, int r, CacheEntry parent, ListenerBase listener) throws DatabaseException { + InternalProcedureWrapper> wrap = new InternalProcedureWrapper>(null); + QueryCache.runnerChildMap(graph, r, parent, listener, wrap); + return wrap.get(); + } + + static boolean shouldCache(QueryProcessor processor, int r) { + return processor.isImmutable(r); + } + + static boolean shouldCache(QueryProcessor processor, int r, int r2) { + return processor.isImmutable(r); + } + + static boolean shouldCache(QueryProcessor processor, Object o) { + return false; + } + }