package org.simantics.db.impl.query; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.atomic.AtomicBoolean; import org.simantics.db.AsyncReadGraph; import org.simantics.db.RelationInfo; import org.simantics.db.common.utils.Logger; import org.simantics.db.exception.DatabaseException; import org.simantics.db.impl.DebugPolicy; import org.simantics.db.impl.graph.ReadGraphImpl; import org.simantics.db.impl.procedure.InternalProcedure; import org.simantics.db.procedure.AsyncMultiProcedure; import org.simantics.db.procedure.AsyncProcedure; import org.simantics.db.procedure.Listener; import org.simantics.db.procedure.ListenerBase; import org.simantics.db.procedure.Procedure; import org.simantics.db.request.AsyncMultiRead; import org.simantics.db.request.AsyncRead; import org.simantics.db.request.ExternalRead; import org.simantics.db.request.MultiRead; import org.simantics.db.request.Read; import gnu.trove.map.hash.THashMap; public class QueryCache { final public UnaryQueryHashMap directPredicatesMap; final public UnaryQueryHashMap principalTypesMap; final public THashMap uriToResourceMap; final public THashMap namespaceIndexMap22; final public UnaryQueryHashMap projectsMap; final public UnaryQueryHashMap> relationInfoMap; final public UnaryQueryHashMap> superTypesMap; final public UnaryQueryHashMap> typeHierarchyMap; final public UnaryQueryHashMap> superRelationsMap; final public UnaryQueryHashMap> typesMap; final public UnaryQueryHashMap> valueMap; final public DoubleKeyQueryHashMap directObjectsMap; final public DoubleKeyQueryHashMap objectsMap; final public UnaryQueryHashMap orderedSetMap; final public UnaryQueryHashMap predicatesMap; final public DoubleKeyQueryHashMap statementsMap; final public UnaryQueryHashMap assertedPredicatesMap; final public BinaryQueryHashMap assertedStatementsMap; final public StableHashMap externalReadMap; final public StableHashMap asyncReadMap; final public StableHashMap readMap; final public StableHashMap asyncMultiReadMap; final public StableHashMap multiReadMap; final THashMap> listeners; public QueryCache() { directPredicatesMap = new UnaryQueryHashMap(); valueMap = new UnaryQueryHashMap(); principalTypesMap = new UnaryQueryHashMap(); uriToResourceMap = new THashMap(); namespaceIndexMap22 = new THashMap(); projectsMap = new UnaryQueryHashMap(); relationInfoMap = new UnaryQueryHashMap(); typeHierarchyMap = new UnaryQueryHashMap(); superTypesMap = new UnaryQueryHashMap(); superRelationsMap = new UnaryQueryHashMap(); typesMap = new UnaryQueryHashMap(); objectsMap = new DoubleKeyQueryHashMap(); orderedSetMap = new UnaryQueryHashMap(); predicatesMap = new UnaryQueryHashMap(); statementsMap = new DoubleKeyQueryHashMap(); directObjectsMap = new DoubleKeyQueryHashMap(); assertedPredicatesMap = new UnaryQueryHashMap(); assertedStatementsMap = new BinaryQueryHashMap(); asyncReadMap = new StableHashMap(); readMap = new StableHashMap(); asyncMultiReadMap = new StableHashMap(); multiReadMap = new StableHashMap(); externalReadMap = new StableHashMap(); listeners = new THashMap>(10, 0.75f); } public int requestHash(Object object) { try { return object.hashCode(); } catch (Throwable t) { Logger.defaultLogError(t); return 0; } } private CacheEntryBase getCached(Object query, int hash) { if (query instanceof AsyncRead) return asyncReadMap.get(query, hash); else if (query instanceof Read) return readMap.get(query, hash); else if (query instanceof ExternalRead) return externalReadMap.get(query, hash); else if (query instanceof AsyncMultiRead) return asyncMultiReadMap.get(query, hash); throw new IllegalStateException(); } private CacheEntryBase createEntry(QuerySupport support, Object query, int hash) { CacheEntryBase result; if (query instanceof AsyncRead) { AsyncReadEntry entry = new AsyncReadEntry((AsyncRead)query); asyncReadMap.put((AsyncRead)query, entry, hash); result = entry; } else if (query instanceof Read) { ReadEntry entry = new ReadEntry((Read)query); readMap.put((Read)query, entry, hash); result = entry; } else if (query instanceof ExternalRead) { ExternalReadEntry entry = new ExternalReadEntry((ExternalRead)query); externalReadMap.put((ExternalRead)query, entry, hash); result = entry; } else if (query instanceof AsyncMultiRead) { AsyncMultiReadEntry entry = new AsyncMultiReadEntry((AsyncMultiRead)query); asyncMultiReadMap.put((AsyncMultiRead)query, entry, hash); result = entry; } else { throw new IllegalStateException(); } result.setPending(); result.clearResult(support); return result; } public final Object runQuery(final ReadGraphImpl graph, final Object query, final CacheEntry parent, final ListenerBase listener, final Object procedure) throws DatabaseException { int hash = requestHash(query); CacheEntryBase entry = getCached(query, hash); if(parent == null && listener == null) { if(entry != null && (entry.isReady() || entry.isExcepted())) { return entry.performFromCache(graph, this, procedure); } else { return performQuery(graph, query, entry, null, procedure); } } if(entry == null) { entry = createEntry(graph.processor.querySupport, query, hash); return performForEach(graph, query, entry, parent, listener, procedure, false); } else { if(entry.isPending()) { synchronized(entry) { if(entry.isPending()) { throw new IllegalStateException(); // final AsyncBarrierImpl parentBarrier = graph.state.barrier; // if(entry.procs == null) entry.procs = new ArrayList>(); // entry.procs.add(new AsyncProcedure() { // // @Override // public void execute(AsyncReadGraph graph, T result) { // procedure.execute(graph, result); // parentBarrier.dec(query); // } // // @Override // public void exception(AsyncReadGraph graph, Throwable throwable) { // procedure.exception(graph, throwable); // parentBarrier.dec(query); // } // // }); // if(graph.parent != null || listener != null) { // registerDependencies(graph, entry, parent, listener, procedure, false); // } // // query.perform(graph, procedure); // // return; } } } if(entry.isReady()) { Object result = entry.performFromCache(graph, this, procedure); registerDependencies(graph, entry, parent, listener, procedure, false); return result; } else { return performForEach(graph, query, entry, parent, listener, procedure, false); } } } public Object performQuery(ReadGraphImpl parentGraph, final Object query_, final CacheEntryBase entry_, ListenerEntry listenerEntry, Object procedure_) throws DatabaseException { ReadGraphImpl queryGraph = parentGraph.withParent(entry_); if(query_ instanceof AsyncRead) { AsyncRead query = (AsyncRead)query_; AsyncReadEntry entry = (AsyncReadEntry)entry_; AsyncProcedure procedure = (AsyncProcedure)procedure_; try { query.perform(queryGraph, new AsyncProcedure() { @Override public void execute(AsyncReadGraph returnGraph, T result) { ReadGraphImpl impl = (ReadGraphImpl)returnGraph; entry.addOrSet(parentGraph, result); if(listenerEntry != null) { primeListenerEntry(listenerEntry, result); } try { procedure.execute(parentGraph, result); } catch (Throwable t) { t.printStackTrace(); } // parentBarrier.dec(query); } @Override public void exception(AsyncReadGraph returnGraph, Throwable t) { ReadGraphImpl impl = (ReadGraphImpl)returnGraph; // AsyncReadGraph resumeGraph = finalParentGraph.newAsync(); entry.except(parentGraph, t); try { procedure.exception(parentGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } // parentBarrier.dec(query); } @Override public String toString() { return procedure.toString(); } }); } catch (Throwable t) { entry.except(t); try { procedure.exception(parentGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } // parentBarrier.dec(query); } return null; } else if (query_ instanceof Read) { Read query = (Read)query_; ReadEntry entry = (ReadEntry)entry_; entry.setPending(); try { T result = (T)query.perform(queryGraph); entry.addOrSet(queryGraph, result); if(listenerEntry != null) primeListenerEntry(listenerEntry, result); return (T)entry.get(parentGraph, parentGraph.processor, procedure_); } catch (Throwable t) { entry.except(t); return (T)entry.get(parentGraph, parentGraph.processor, procedure_); } } else if (query_ instanceof ExternalRead) { ExternalRead query = (ExternalRead)query_; ExternalReadEntry entry = (ExternalReadEntry)entry_; Procedure procedure = (Procedure)procedure_; try { query.register(parentGraph, new Listener() { AtomicBoolean used = new AtomicBoolean(false); @Override public void execute(T result) { // Just for safety if(entry.isDiscarded()) return; if(entry.isExcepted()) entry.setPending(); if(used.compareAndSet(false, true)) { entry.addOrSet(parentGraph.processor, result); procedure.execute(result); } else { entry.queue(result); parentGraph.processor.updatePrimitive(query); } } @Override public void exception(Throwable t) { entry.except(t); if(used.compareAndSet(false, true)) { procedure.exception(t); } else { // entry.queue(result); parentGraph.processor.updatePrimitive(query); } } @Override public String toString() { return procedure.toString(); } @Override public boolean isDisposed() { return entry.isDiscarded() || !parentGraph.processor.isBound(entry); } }); return entry.getResult(); } catch (Throwable t) { entry.except(t); procedure.exception(t); return entry.getResult(); } } else if (query_ instanceof AsyncMultiRead) { AsyncMultiRead query = (AsyncMultiRead)query_; AsyncMultiReadEntry entry = (AsyncMultiReadEntry)entry_; AsyncMultiProcedure procedure = (AsyncMultiProcedure)procedure_; try { query.perform(queryGraph, new AsyncMultiProcedure() { @Override public void execute(AsyncReadGraph graph, T result) { ReadGraphImpl impl = (ReadGraphImpl)graph; entry.addOrSet(result); try { procedure.execute(parentGraph, result); } catch (Throwable t) { t.printStackTrace(); } } @Override public void finished(AsyncReadGraph graph) { ReadGraphImpl impl = (ReadGraphImpl)graph; entry.finish(parentGraph); try { procedure.finished(parentGraph); } catch (Throwable t) { t.printStackTrace(); } } @Override public void exception(AsyncReadGraph graph, Throwable t) { ReadGraphImpl impl = (ReadGraphImpl)graph; entry.except(parentGraph, t); try { procedure.exception(parentGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } } }); return entry.getResult(); } catch (Throwable t) { entry.except(t); try { procedure.exception(parentGraph, t); } catch (Throwable t2) { t2.printStackTrace(); } return entry.getResult(); } } else { throw new IllegalStateException(); } } public Object performFromCache(ReadGraphImpl parentGraph, Object query, CacheEntryBase entry_, ListenerEntry listenerEntry, Object procedure_) throws DatabaseException { Object result = entry_.performFromCache(parentGraph, this, procedure_); if(listenerEntry != null) { primeListenerEntry(listenerEntry, result); } return result; // if(query instanceof AsyncRead) { // // AsyncProcedure procedure = (AsyncProcedure)procedure_; // // // return null; // // } else if(query instanceof Read) { // // ReadEntry entry = (ReadEntry)entry_; // // T result = (T)entry.get(parentGraph, parentGraph.processor, procedure_); // // if(listenerEntry != null) primeListenerEntry(listenerEntry, result); // // return result; // // } else if(query instanceof ExternalRead) { // // ExternalReadEntry entry = (ExternalReadEntry)entry_; // Procedure procedure = (Procedure)procedure_; // // return entry.performFromCache(procedure); // // } else if (query instanceof AsyncMultiRead) { // // AsyncMultiReadEntry entry = (AsyncMultiReadEntry)entry_; // AsyncMultiProcedure procedure = (AsyncMultiProcedure)procedure_; // // return entry.performFromCache(parentGraph, this, procedure); // // } else { // // throw new IllegalStateException(); // // } } public Object performForEach(ReadGraphImpl parentGraph, final Object query, final CacheEntryBase entry, final CacheEntry parent, final ListenerBase base, final Object procedure, boolean inferredDependency) throws DatabaseException { // if (DebugPolicy.PERFORM) // System.out.println("PE[ " + (query.hashCode() & THREAD_MASK) + "] " + query); // assert (!dirty); // assert (!collecting); assert(!entry.isDiscarded()); final ListenerEntry listenerEntry = registerDependencies(parentGraph, entry, parent, base, procedure, inferredDependency); // FRESH, REFUTED, EXCEPTED go here if (!entry.isReady()) { entry.setPending(); // size++; return performQuery(parentGraph, query, entry, listenerEntry, procedure); // misses++; } else { return performFromCache(parentGraph, query, entry, listenerEntry, procedure); // parentBarrier.dec(query); // hits++; } } synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) { if (parent != null && !inferred) { try { if(!child.isImmutable(graph)) child.addParent(parent); } catch (DatabaseException e) { Logger.defaultLogError(e); } if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent); } if (listener != null) { return registerListener(child, listener, procedure); } else { return null; } } public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) { assert (entry != null); if (base.isDisposed()) return null; return addListener(entry, base, procedure); } private void primeListenerEntry(final ListenerEntry entry, final Object result) { entry.setLastKnown(result); } private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) { assert (entry != null); assert (procedure != null); ArrayList list = listeners.get(entry); if (list == null) { list = new ArrayList(1); listeners.put(entry, list); } ListenerEntry result = new ListenerEntry(entry, base, procedure); int currentIndex = list.indexOf(result); // There was already a listener if(currentIndex > -1) { ListenerEntry current = list.get(currentIndex); if(!current.base.isDisposed()) return null; list.set(currentIndex, result); } else { list.add(result); } if(DebugPolicy.LISTENER) { new Exception().printStackTrace(); System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure); } return result; } public Collection getRootList() { ArrayList result = new ArrayList(); for (Object e : valueMap.values()) { result.add((CacheEntry) e); } for (Object e : directPredicatesMap.values()) { result.add((CacheEntry) e); } for (Object e : objectsMap.values()) { result.add((CacheEntry) e); } for (Object e : directObjectsMap.values()) { result.add((CacheEntry) e); } for (Object e : principalTypesMap.values()) { result.add((CacheEntry) e); } for (Object e : superRelationsMap.values()) { result.add((CacheEntry) e); } for (Object e : superTypesMap.values()) { result.add((CacheEntry) e); } for (Object e : typesMap.values()) { result.add((CacheEntry) e); } for (Object e : objectsMap.values()) { result.add((CacheEntry) e); } for (Object e : assertedStatementsMap.values()) { result.add((CacheEntry) e); } for (Object e : readMap.values()) { if(e instanceof CacheEntry) { result.add((CacheEntry) e); } else { System.err.println("e=" + e); } } for (Object e : asyncReadMap.values()) { if(e instanceof CacheEntry) { result.add((CacheEntry) e); } else { System.err.println("e=" + e); } } for (Object e : externalReadMap.values()) { result.add((CacheEntry) e); } for (Object e : orderedSetMap.values()) { result.add((CacheEntry) e); } return result; } public int calculateCurrentSize() { int realSize = 0; realSize += directPredicatesMap.size(); realSize += principalTypesMap.size(); realSize += uriToResourceMap.size(); realSize += namespaceIndexMap22.size(); realSize += projectsMap.size(); realSize += relationInfoMap.size(); realSize += superTypesMap.size(); realSize += typeHierarchyMap.size(); realSize += superRelationsMap.size(); realSize += typesMap.size(); realSize += valueMap.size(); realSize += directObjectsMap.size(); realSize += objectsMap.size(); realSize += orderedSetMap.size(); realSize += predicatesMap.size(); realSize += statementsMap.size(); realSize += assertedPredicatesMap.size(); realSize += assertedStatementsMap.size(); realSize += externalReadMap.size(); realSize += asyncReadMap.size(); realSize += readMap.size(); realSize += asyncMultiReadMap.size(); realSize += multiReadMap.size(); return realSize; } CacheCollectionResult allCaches(CacheCollectionResult result) { int level = Integer.MAX_VALUE; directPredicatesMap.values(level, result); principalTypesMap.values(level, result); for(CacheEntryBase e : uriToResourceMap.values()) if(e.getLevel() <= level) result.add(e); for(CacheEntryBase e : namespaceIndexMap22.values()) if(e.getLevel() <= level) result.add(e); projectsMap.values(level, result); relationInfoMap.values(level, result); superTypesMap.values(level, result); typeHierarchyMap.values(level, result); superRelationsMap.values(level, result); typesMap.values(level, result); valueMap.values(level, result); directObjectsMap.values(level, result); objectsMap.values(level, result); orderedSetMap.values(level, result); predicatesMap.values(level, result); statementsMap.values(level, result); assertedPredicatesMap.values(level, result); assertedStatementsMap.values(level, result); externalReadMap.values(level, result); asyncReadMap.values(level, result); readMap.values(level, result); asyncMultiReadMap.values(level, result); multiReadMap.values(level, result); return result; } public void scanPending() { ArrayList entries = new ArrayList(); entries.addAll(directPredicatesMap.values()); entries.addAll(principalTypesMap.values()); entries.addAll(uriToResourceMap.values()); entries.addAll(namespaceIndexMap22.values()); entries.addAll(projectsMap.values()); entries.addAll(relationInfoMap.values()); entries.addAll(superTypesMap.values()); entries.addAll(superRelationsMap.values()); entries.addAll(typesMap.values()); entries.addAll(valueMap.values()); entries.addAll(directObjectsMap.values()); entries.addAll(objectsMap.values()); entries.addAll(orderedSetMap.values()); entries.addAll(predicatesMap.values()); entries.addAll(orderedSetMap.values()); entries.addAll(statementsMap.values()); // entries.addAll(assertedObjectsMap.values()); entries.addAll(assertedPredicatesMap.values()); entries.addAll(assertedStatementsMap.values()); entries.addAll(externalReadMap.values()); entries.addAll(asyncReadMap.values()); entries.addAll(externalReadMap.values()); entries.addAll(readMap.values()); entries.addAll(asyncMultiReadMap.values()); entries.addAll(multiReadMap.values()); entries.addAll(readMap.values()); System.out.println(entries.size() + " entries."); for(Object e : entries) { if(e instanceof CacheEntry) { CacheEntry en = (CacheEntry)e; if(en.isPending()) System.out.println("pending " + e); if(en.isExcepted()) System.out.println("excepted " + e); if(en.isDiscarded()) System.out.println("discarded " + e); if(en.isRefuted()) System.out.println("refuted " + e); if(en.isFresh()) System.out.println("fresh " + e); } else { //System.out.println("Unknown object " + e); } } } }