import org.simantics.db.VirtualGraph;
import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
import org.simantics.db.common.utils.Logger;
-import org.simantics.db.debug.ListenerReport;
import org.simantics.db.exception.DatabaseException;
import org.simantics.db.exception.ManyObjectsForFunctionalRelationException;
import org.simantics.db.exception.NoInverseException;
import org.simantics.db.exception.ResourceNotFoundException;
-import org.simantics.db.impl.DebugPolicy;
import org.simantics.db.impl.ResourceImpl;
+import org.simantics.db.impl.graph.BarrierTracing;
import org.simantics.db.impl.graph.ReadGraphImpl;
import org.simantics.db.impl.graph.ReadGraphSupport;
-import org.simantics.db.impl.graph.WriteGraphImpl;
import org.simantics.db.impl.procedure.IntProcedureAdapter;
import org.simantics.db.impl.procedure.InternalProcedure;
import org.simantics.db.impl.procedure.TripleIntProcedureAdapter;
AtomicInteger sleepers = new AtomicInteger(0);
- private boolean updating = false;
+ boolean updating = false;
- private boolean firingListeners = false;
-
final public QueryCache cache;
final public QuerySupport querySupport;
final public Session session;
final public ResourceSupport resourceSupport;
-
- private THashSet<ListenerEntry> scheduledListeners = new THashSet<ListenerEntry>();
+
+ final public Semaphore requests = new Semaphore(1);
+
+ final public QueryListening listening = new QueryListening(this);
QueryThread[] executors;
-
-// public ArrayList<SessionTask>[] queues;
public LinkedList<SessionTask> freeScheduling = new LinkedList<SessionTask>();
+
+ public LinkedList<SessionTask> topLevelTasks = new LinkedList<SessionTask>();
enum ThreadState {
}
public ThreadState[] threadStates;
-// public ReentrantLock[] threadLocks;
-// public Condition[] threadConditions;
-
- //public ArrayList<SessionTask>[] ownTasks;
-
- //public ArrayList<SessionTask>[] ownSyncTasks;
-
- //ArrayList<SessionTask>[] delayQueues;
final Object querySupportLock;
public void close() {
}
- SessionTask getOwnTask(int thread) {
- synchronized(querySupportLock) {
- int index = 0;
- while(index < freeScheduling.size()) {
- SessionTask task = freeScheduling.get(index);
- if(task.thread == thread && !task.systemCall)
- return freeScheduling.remove(index);
- index++;
- }
- }
- return null;
- }
-
- public boolean performPending(int thread) {
- SessionTask task = getOwnTask(thread);
+ public SessionTask getSubTask(ReadGraphImpl parent) {
+ synchronized(querySupportLock) {
+ int index = 0;
+ while(index < freeScheduling.size()) {
+ SessionTask task = freeScheduling.get(index);
+ if(task.isSubtask(parent) && task.maybeReady()) {
+ return freeScheduling.remove(index);
+ }
+ index++;
+ }
+ }
+ return null;
+ }
+
+ /*
+ * We are running errands while waiting for requests to complete.
+ * We can only run work that is part of the current root request to avoid any deadlocks
+ */
+ public boolean performPending(ReadGraphImpl under) {
+ SessionTask task = getSubTask(under);
if(task != null) {
- task.run(thread);
+ task.run(thread.get());
return true;
- } else {
- return false;
}
+ return false;
}
+
+ final public void scheduleNow(SessionTask request) {
+ SessionTask toExecute = scheduleOrReturnForExecution(request);
+ if(toExecute != null)
+ toExecute.run(thread.get());
+ }
-// final public void scheduleOwn(int caller, SessionTask request) {
-// ownTasks[caller].add(request);
-// }
-
- final public void schedule(SessionTask request) {
+ final public SessionTask scheduleOrReturnForExecution(SessionTask request) {
- int performer = request.thread;
+ assert(request != null);
- if(DebugPolicy.SCHEDULE)
- System.out.println("schedule " + request + " " + " -> " + performer);
+ synchronized(querySupportLock) {
- //assert(performer >= 0);
+ LinkedList<SessionTask> queue = request.rootGraph != null ? freeScheduling : topLevelTasks;
+
+ if(BarrierTracing.BOOKKEEPING) {
+ Exception current = new Exception();
+ Exception previous = BarrierTracing.tasks.put(request, current);
+ if(previous != null) {
+ previous.printStackTrace();
+ current.printStackTrace();
+ }
+ }
- assert(request != null);
+ queue.addFirst(request);
+ requests.release();
-// if(caller == performer) {
-// request.run(caller);
-// } else {
-
-// if(performer == THREADS) {
-
- synchronized(querySupportLock) {
-
- //new Exception().printStackTrace();
-
- freeScheduling.add(request);
-
- querySupportLock.notifyAll();
-
- //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
-
-// for(int i=0;i<THREADS;i++) {
-// ReentrantLock queueLock = threadLocks[i];
-// queueLock.lock();
-// //queues[performer].add(request);
-// //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
-// threadConditions[i].signalAll();
-// queueLock.unlock();
-// }
-
- }
+ }
- return;
-
-// }
-//
-// ReentrantLock queueLock = threadLocks[performer];
-// queueLock.lock();
-// queues[performer].add(request);
-// // This thread could have been sleeping
-// if(queues[performer].size() == 1) {
-// //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-// threadConditions[performer].signalAll();
-// }
-// queueLock.unlock();
-// }
+ return null;
- }
+ }
final int THREADS;
final public int THREAD_MASK;
-
- final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
+
+ final public static ThreadGroup QueryThreadGroup = new ThreadGroup("Query Thread Group");
public static abstract class SessionTask {
- final public int thread;
- final public boolean systemCall;
-// final public int syncCaller;
- //final public Object object;
+ final protected ReadGraphImpl rootGraph;
+ private int counter = 0;
+ protected int position = 1;
+ private Exception trace;
- public SessionTask(boolean systemCall) {
- this.thread = QueryProcessor.thread.get();
- this.systemCall = systemCall;
-// this.syncCaller = -1;
- //this.object = object;
+ public SessionTask() {
+ this(null);
}
+
+ public SessionTask(ReadGraphImpl rootGraph) {
+ this.rootGraph = rootGraph;
+ }
+
+ public boolean isSubtask(ReadGraphImpl graph) {
+ return graph.isParent(rootGraph);
+ }
-// public SessionTask(Object object, int syncCaller) {
-// this.thread = QueryProcessor.thread.get();
-// this.syncCaller = syncCaller;
-// this.object = object;
-// }
-
- public abstract void run(int thread);
+ public abstract void run0(int thread);
+
+ public final void run(int thread) {
+ if(counter++ > 0) {
+ if(BarrierTracing.BOOKKEEPING) {
+ trace.printStackTrace();
+ new Exception().printStackTrace();
+ }
+ throw new IllegalStateException("Multiple invocations of SessionTask!");
+ }
+ if(BarrierTracing.BOOKKEEPING) {
+ trace = new Exception();
+ }
+ run0(thread);
+ }
+
+ public boolean maybeReady() {
+ return true;
+ }
@Override
public String toString() {
- return "SessionTask[" + super.toString() + "]";
+ if(rootGraph == null)
+ return "SessionTask[no graph]";
+ else
+ return "SessionTask[" + rootGraph.parent + "]";
}
}
final public DataContainer<Throwable> throwable;
public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
- super(true);
+ super(null);
this.throwable = throwable;
this.notify = notify;
}
}
- long waitingTime = 0;
-
- static int koss = 0;
- static int koss2 = 0;
-
public boolean resume(ReadGraphImpl graph) {
return executors[0].runSynchronized();
}
-
- //private WeakReference<GarbageTracker> garbageTracker;
-
- private class GarbageTracker {
-
- @Override
- protected void finalize() throws Throwable {
-
-// System.err.println("GarbageTracker");
-//
-// garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-
- super.finalize();
-
- }
-
- }
public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
throws DatabaseException {
- //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
-
THREADS = threads;
THREAD_MASK = threads - 1;
querySupportLock = core.getLock();
executors = new QueryThread[THREADS];
-// queues = new ArrayList[THREADS];
-// threadLocks = new ReentrantLock[THREADS];
-// threadConditions = new Condition[THREADS];
threadStates = new ThreadState[THREADS];
-// ownTasks = new ArrayList[THREADS];
-// ownSyncTasks = new ArrayList[THREADS];
-// delayQueues = new ArrayList[THREADS * THREADS];
-
- // freeSchedule = new AtomicInteger(0);
-
-// for (int i = 0; i < THREADS * THREADS; i++) {
-// delayQueues[i] = new ArrayList<SessionTask>();
-// }
for (int i = 0; i < THREADS; i++) {
-
- // tasks[i] = new ArrayList<Runnable>();
-// ownTasks[i] = new ArrayList<SessionTask>();
-// ownSyncTasks[i] = new ArrayList<SessionTask>();
-// queues[i] = new ArrayList<SessionTask>();
-// threadLocks[i] = new ReentrantLock();
-// threadConditions[i] = threadLocks[i].newCondition();
- // limits[i] = false;
threadStates[i] = ThreadState.INIT;
-
}
for (int i = 0; i < THREADS; i++) {
}
final public void releaseWrite(ReadGraphImpl graph) {
- performDirtyUpdates(graph);
+ propagateChangesInQueryCache(graph);
modificationCounter++;
}
boolean isBound(ExternalReadEntry<?> entry) {
if(entry.hasParents()) return true;
- else if(hasListener(entry)) return true;
+ else if(listening.hasListener(entry)) return true;
else return false;
}
- synchronized public ListenerEntry registerDependencies(ReadGraphImpl graph, CacheEntry child, CacheEntry parent, ListenerBase listener, Object procedure, boolean inferred) {
-
- if (parent != null && !inferred) {
- try {
- if(!child.isImmutable(graph))
- child.addParent(parent);
- } catch (DatabaseException e) {
- Logger.defaultLogError(e);
- }
- if(DebugPolicy.DEPENDENCIES) System.out.println(child + " -> " + parent);
- }
-
- if (listener != null) {
- return registerListener(child, listener, procedure);
- } else {
- return null;
- }
-
- }
-
-
static class Dummy implements InternalProcedure<Object>, IntProcedure {
@Override
}
- public ListenerEntry registerListener(final CacheEntry entry, final ListenerBase base, final Object procedure) {
-
- assert (entry != null);
-
- if (base.isDisposed())
- return null;
-
- return addListener(entry, base, procedure);
-
- }
-
- private void primeListenerEntry(final ListenerEntry entry, final Object result) {
- entry.setLastKnown(result);
- }
-
- private ListenerEntry addListener(CacheEntry entry, ListenerBase base, Object procedure) {
-
- assert (entry != null);
- assert (procedure != null);
-
- ArrayList<ListenerEntry> list = cache.listeners.get(entry);
- if (list == null) {
- list = new ArrayList<ListenerEntry>(1);
- cache.listeners.put(entry, list);
- }
-
- ListenerEntry result = new ListenerEntry(entry, base, procedure);
- int currentIndex = list.indexOf(result);
- // There was already a listener
- if(currentIndex > -1) {
- ListenerEntry current = list.get(currentIndex);
- if(!current.base.isDisposed()) return null;
- list.set(currentIndex, result);
- } else {
- list.add(result);
- }
-
- if(DebugPolicy.LISTENER) {
- new Exception().printStackTrace();
- System.out.println("addListener -> " + list.size() + " " + entry + " " + base + " " + procedure);
- }
-
- return result;
-
- }
-
- private void scheduleListener(ListenerEntry entry) {
- assert (entry != null);
- if(DebugPolicy.LISTENER) System.out.println("Scheduled " + entry.procedure);
- scheduledListeners.add(entry);
- }
-
- private void removeListener(ListenerEntry entry) {
- assert (entry != null);
- ArrayList<ListenerEntry> list = cache.listeners.get(entry.entry);
- if(list == null) return;
- boolean success = list.remove(entry);
- assert (success);
- if (list.isEmpty())
- cache.listeners.remove(entry.entry);
- }
-
- private boolean hasListener(CacheEntry entry) {
- if(cache.listeners.get(entry) != null) return true;
- return false;
- }
-
- boolean hasListenerAfterDisposing(CacheEntry entry) {
- if(cache.listeners.get(entry) != null) {
- ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
- ArrayList<ListenerEntry> list = null;
- for (ListenerEntry e : entries) {
- if (e.base.isDisposed()) {
- if(list == null) list = new ArrayList<ListenerEntry>();
- list.add(e);
- }
- }
- if(list != null) {
- for (ListenerEntry e : list) {
- entries.remove(e);
- }
- }
- if (entries.isEmpty()) {
- cache.listeners.remove(entry);
- return false;
- }
- return true;
- }
- return false;
- }
-
- List<ListenerEntry> getListenerEntries(CacheEntry entry) {
- hasListenerAfterDisposing(entry);
- if(cache.listeners.get(entry) != null)
- return cache.listeners.get(entry);
- else
- return Collections.emptyList();
- }
-
- void processListenerReport(CacheEntry<?> entry, Map<CacheEntry, Set<ListenerBase>> workarea) {
-
- if(!workarea.containsKey(entry)) {
-
- HashSet<ListenerBase> ls = new HashSet<ListenerBase>();
- for(ListenerEntry e : getListenerEntries(entry))
- ls.add(e.base);
-
- workarea.put(entry, ls);
-
- for(CacheEntry parent : entry.getParents(this)) {
- processListenerReport(parent, workarea);
- ls.addAll(workarea.get(parent));
- }
-
- }
-
- }
-
- public synchronized ListenerReport getListenerReport() throws IOException {
-
- class ListenerReportImpl implements ListenerReport {
-
- Map<CacheEntry, Set<ListenerBase>> workarea = new HashMap<CacheEntry, Set<ListenerBase>>();
-
- @Override
- public void print(PrintStream b) {
- Map<ListenerBase, Integer> hist = new HashMap<ListenerBase, Integer>();
- for(Map.Entry<CacheEntry, Set<ListenerBase>> e : workarea.entrySet()) {
- for(ListenerBase l : e.getValue()) {
- Integer i = hist.get(l);
- hist.put(l, i != null ? i-1 : -1);
- }
- }
-
- for(Pair<ListenerBase, Integer> p : CollectionUtils.valueSortedEntries(hist)) {
- b.print("" + -p.second + " " + p.first + "\n");
- }
-
- b.flush();
- }
-
- }
-
- ListenerReportImpl result = new ListenerReportImpl();
-
- Collection<CacheEntryBase> all = allCaches(new CacheCollectionResult()).toCollection();
- for(CacheEntryBase entry : all) {
- hasListenerAfterDisposing(entry);
- }
- for(CacheEntryBase entry : all) {
- processListenerReport(entry, result.workarea);
- }
-
- return result;
-
- }
-
- public synchronized String reportListeners(File file) throws IOException {
-
- if (!isAlive())
- return "Disposed!";
-
- PrintStream b = new PrintStream(new BufferedOutputStream(new FileOutputStream(file)));
- ListenerReport report = getListenerReport();
- report.print(b);
-
- return "Done reporting listeners.";
-
- }
void processParentReport(CacheEntry entry, Map<CacheEntry, Set<CacheEntry>> workarea) {
int listeners = 0;
for(CacheEntry entry : workarea.keySet()) {
- boolean listener = hasListenerAfterDisposing(entry);
+ boolean listener = listening.hasListenerAfterDisposing(entry);
boolean hasParents = entry.getParents(this).iterator().hasNext();
if(listener) {
// Bound
for(CacheEntry entry : workarea.keySet()) {
Class<?> clazz = entry.getClass();
- if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).request.getClass();
- else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).request.getClass();
- else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).request.getClass();
- else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).request.getClass();
- else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).request.getClass();
+ if(entry instanceof ReadEntry) clazz = ((ReadEntry)entry).id.getClass();
+ else if(entry instanceof MultiReadEntry) clazz = ((MultiReadEntry)entry).id.getClass();
+ else if(entry instanceof AsyncReadEntry) clazz = ((AsyncReadEntry)entry).id.getClass();
+ else if(entry instanceof AsyncMultiReadEntry) clazz = ((AsyncMultiReadEntry)entry).id.getClass();
+ else if(entry instanceof ExternalReadEntry) clazz = ((ExternalReadEntry)entry).id.getClass();
Integer c = counts.get(clazz);
if(c == null) counts.put(clazz, -1);
else counts.put(clazz, c-1);
b.print("// Entry parent listing\n");
for(CacheEntry entry : workarea.keySet()) {
int status = flagMap.get(entry);
- boolean hasListener = hasListenerAfterDisposing(entry);
+ boolean hasListener = listening.hasListenerAfterDisposing(entry);
b.print("Q " + entry.toString());
if(hasListener) {
b.print(" (L" + status + ")");
}
- class UpdateEntry {
-
- public CacheEntry caller;
-
- public CacheEntry entry;
-
- public int indent;
-
- public UpdateEntry(CacheEntry caller, CacheEntry entry, int indent) {
- this.caller = caller;
- this.entry = entry;
- this.indent = indent;
- }
-
- };
-
boolean removeQuery(CacheEntry entry) {
// This entry has been removed before. No need to do anything here.
CacheEntry entry = e.entry;
- //System.err.println("updateQuery " + entry);
-
/*
* If the dependency graph forms a DAG, some entries are inserted in the
* todo list many times. They only need to be processed once though.
if (entry.isDiscarded()) {
if (Development.DEVELOPMENT) {
if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- System.out.print("D");
+ System.err.print("D");
for (int i = 0; i < e.indent; i++)
- System.out.print(" ");
- System.out.println(entry.getQuery());
+ System.err.print(" ");
+ System.err.println(entry.getQuery());
}
}
// System.err.println(" => DISCARDED");
return false;
}
- if (entry.isRefuted()) {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- System.out.print("R");
- for (int i = 0; i < e.indent; i++)
- System.out.print(" ");
- System.out.println(entry.getQuery());
- }
- }
- return false;
- }
+// if (entry.isRefuted()) {
+// if (Development.DEVELOPMENT) {
+// if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+// System.err.print("R");
+// for (int i = 0; i < e.indent; i++)
+// System.err.print(" ");
+// System.err.println(entry.getQuery());
+// }
+// }
+// return false;
+// }
if (entry.isExcepted()) {
if (Development.DEVELOPMENT) {
if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- System.out.print("E");
+ System.err.print("E");
}
}
}
if (entry.isPending()) {
if (Development.DEVELOPMENT) {
if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- System.out.print("P");
+ System.err.print("P");
}
}
}
if (Development.DEVELOPMENT) {
if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- System.out.print("U ");
+ System.err.print("U ");
for (int i = 0; i < e.indent; i++)
- System.out.print(" ");
- System.out.print(entry.getQuery());
+ System.err.print(" ");
+ System.err.print(entry.getQuery());
}
}
Query query = entry.getQuery();
int type = query.type();
- boolean hasListener = hasListener(entry);
+ boolean hasListener = listening.hasListener(entry);
if (Development.DEVELOPMENT) {
if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- if(hasListener(entry)) {
- System.out.println(" (L)");
+ if(listening.hasListener(entry)) {
+ System.err.println(" (L)");
} else {
- System.out.println("");
+ System.err.println("");
}
}
}
// System.err.println(" => FOO " + type);
if (hasListener) {
- ArrayList<ListenerEntry> entries = cache.listeners.get(entry);
+ ArrayList<ListenerEntry> entries = listening.listeners.get(entry);
if(entries != null) {
for (ListenerEntry le : entries) {
- scheduleListener(le);
+ listening.scheduleListener(le);
}
}
}
// If invalid, update parents
if (type == RequestFlags.INVALIDATE) {
- updateParents(e.indent, entry, todo);
+ listening.updateParents(e.indent, entry, todo);
}
return hasListener;
}
- private void updateParents(int indent, CacheEntry entry, LinkedList<UpdateEntry> todo) {
-
- Iterable<CacheEntry> oldParents = entry.getParents(this);
- for (CacheEntry parent : oldParents) {
-// System.err.println("updateParents " + entry + " => " + parent);
- if(!parent.isDiscarded())
- todo.push(new UpdateEntry(entry, parent, indent + 2));
- }
-
- }
-
- private boolean pruneListener(ListenerEntry entry) {
- if (entry.base.isDisposed()) {
- removeListener(entry);
- return true;
- } else {
- return false;
- }
- }
-
/**
* @param av1 an array (guaranteed)
* @param av2 any object
- final private Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
+ final Object compareTo(ReadGraphImpl graph, final CacheEntry entry, final Object oldValue) {
try {
Query query = entry.getQuery();
- if(DebugPolicy.RECOMPUTE) System.out.println("R " + query);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_RECOMPUTE, Bindings.BOOLEAN)) {
+ System.err.println("R " + query);
+ }
+ }
entry.prepareRecompute(querySupport);
- ReadGraphImpl parentGraph = graph.withParent(entry);
+ ReadGraphImpl parentGraph = graph.forRecompute(entry);
query.recompute(parentGraph);
Object newValue = entry.getResult();
if (ListenerEntry.NO_VALUE == oldValue) {
- if(DebugPolicy.CHANGES) {
- System.out.println("C " + query);
- System.out.println("- " + oldValue);
- System.out.println("- " + newValue);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
+ System.out.println("C " + query);
+ System.out.println("- " + oldValue);
+ System.out.println("- " + newValue);
+ }
}
return newValue;
}
} else
changed = (oldValue != null);
- if(DebugPolicy.CHANGES && changed) {
- System.out.println("C " + query);
- System.out.println("- " + oldValue);
- System.out.println("- " + newValue);
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_CHANGES, Bindings.BOOLEAN)) {
+ System.err.println("C " + query);
+ System.err.println("- " + oldValue);
+ System.err.println("- " + newValue);
+ }
}
return changed ? newValue : ListenerEntry.NOT_CHANGED;
}
- public boolean hasScheduledUpdates() {
- return !scheduledListeners.isEmpty();
- }
-
- public void performScheduledUpdates(WriteGraphImpl graph) {
-
- assert (!updating);
- assert (!cache.collecting);
- assert (!firingListeners);
-
- firingListeners = true;
-
- try {
-
- // Performing may cause further events to be scheduled.
- while (!scheduledListeners.isEmpty()) {
-
-// graph.restart();
-// graph.state.barrier.inc();
-
- // Clone current events to make new entries possible during
- // firing.
- THashSet<ListenerEntry> entries = scheduledListeners;
- scheduledListeners = new THashSet<ListenerEntry>();
-
- ArrayList<ListenerEntry> schedule = new ArrayList<ListenerEntry>();
-
- for (ListenerEntry listenerEntry : entries) {
-
- if (pruneListener(listenerEntry)) {
- if(DebugPolicy.LISTENER) System.out.println("Pruned " + listenerEntry.procedure);
- continue;
- }
-
- final CacheEntry entry = listenerEntry.entry;
- assert (entry != null);
-
- Object newValue = compareTo(graph, entry, listenerEntry.getLastKnown());
-
- if (newValue != ListenerEntry.NOT_CHANGED) {
- if(DebugPolicy.LISTENER)
- System.out.println("Add to schedule " + listenerEntry.procedure + " with " + newValue);
- schedule.add(listenerEntry);
- listenerEntry.setLastKnown(entry.getResult());
- }
-
- }
-
- for(ListenerEntry listenerEntry : schedule) {
- final CacheEntry entry = listenerEntry.entry;
- if(DebugPolicy.LISTENER)
- System.out.println("Firing " + listenerEntry.procedure);
- try {
- if(DebugPolicy.LISTENER)
- System.out.println("Firing " + listenerEntry.procedure + " for " + listenerEntry.entry);
- entry.performFromCache(graph, listenerEntry.procedure);
- } catch (Throwable t) {
- t.printStackTrace();
- }
- }
-
-// graph.state.barrier.dec();
-// graph.waitAsync(null);
-// graph.state.barrier.assertReady();
-
- }
-
- } finally {
- firingListeners = false;
- }
-
- }
/**
*
Object newValue = compareTo(graph, immediate, ListenerEntry.NO_VALUE);
if (newValue != ListenerEntry.NOT_CHANGED)
- updateParents(0, immediate, todo);
+ listening.updateParents(0, immediate, todo);
} else {
Object newValue = compareTo(graph, immediate, oldValue);
if (newValue != ListenerEntry.NOT_CHANGED) {
- updateParents(0, immediate, todo);
+ listening.updateParents(0, immediate, todo);
} else {
// If not changed, keep the old value
immediate.setResult(oldValue);
+ immediate.setReady();
listenersUnknown = true;
}
private Object primitiveUpdateLock = new Object();
private THashSet scheduledPrimitiveUpdates = new THashSet();
- public void performDirtyUpdates(final ReadGraphImpl graph) {
+ private ArrayList<CacheEntry> refutations = new ArrayList<>();
+
+ private void markForUpdate(ReadGraphImpl graph, CacheEntry e) {
+ e.refute();
+ refutations.add(e);
+ }
+
+ private void updateRefutations(ReadGraphImpl graph) {
+
+ for(CacheEntry e : refutations)
+ update(graph, e);
+
+ refutations.clear();
+
+ }
+
+ public void propagateChangesInQueryCache(final ReadGraphImpl graph) {
+
+ // Make sure that listening has performed its work
+ listening.sync();
cache.dirty = false;
lastInvalidate = 0;
final int subject = (int)(arg0 >>> 32);
final int predicate = (int)(arg0 & 0xffffffff);
- for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
- for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
- for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
+ for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
- if(principalTypes != null) update(graph, principalTypes);
+ if(principalTypes != null) markForUpdate(graph, principalTypes);
Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
- if(types != null) update(graph, types);
+ if(types != null) markForUpdate(graph, types);
}
if(predicate == subrelationOf) {
SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
- if(superRelations != null) update(graph, superRelations);
+ if(superRelations != null) markForUpdate(graph, superRelations);
}
DirectPredicates dp = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
- if(dp != null) update(graph, dp);
+ if(dp != null) markForUpdate(graph, dp);
OrderedSet os = QueryCache.entryOrderedSet(QueryProcessor.this, predicate);
- if(os != null) update(graph, os);
+ if(os != null) markForUpdate(graph, os);
+ updateRefutations(graph);
+
scheduledObjectUpdates.clear();
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.println("== Query update ends ==");
+ }
+ }
+
return;
}
int arg0 = scheduledValueUpdates.getFirst();
ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
- if(valueQuery != null) update(graph, valueQuery);
+ if(valueQuery != null) markForUpdate(graph, valueQuery);
+
+ updateRefutations(graph);
scheduledValueUpdates.clear();
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.println("== Query update ends ==");
+ }
+ }
+
return;
}
scheduledPrimitiveUpdates = new THashSet();
}
- primitiveUpdates.forEach(new TObjectProcedure() {
-
- @Override
- public boolean execute(Object arg0) {
-
- ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
- if (query != null) {
- boolean listening = update(graph, query);
- if (!listening && !query.hasParents()) {
- cache.externalReadEntryMap.remove(arg0);
- query.discard();
- }
- }
- return true;
- }
-
- });
-
scheduledValueUpdates.forEach(new TIntProcedure() {
@Override
public boolean execute(int arg0) {
ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, arg0);
- if(valueQuery != null) update(graph, valueQuery);
+ if(valueQuery != null) markForUpdate(graph, valueQuery);
return true;
}
public boolean execute(int resource) {
ValueQuery valueQuery = QueryCache.entryValueQuery(QueryProcessor.this, resource);
- if(valueQuery != null) update(graph, valueQuery);
+ if(valueQuery != null) markForUpdate(graph, valueQuery);
PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, resource);
- if(principalTypes != null) update(graph, principalTypes);
+ if(principalTypes != null) markForUpdate(graph, principalTypes);
Types types = QueryCache.entryTypes(QueryProcessor.this, resource);
- if(types != null) update(graph, types);
+ if(types != null) markForUpdate(graph, types);
SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, resource);
- if(superRelations != null) update(graph, superRelations);
+ if(superRelations != null) markForUpdate(graph, superRelations);
predicates.add(resource);
if(predicate == instanceOf || predicate == inherits || predicate == subrelationOf) {
PrincipalTypes principalTypes = QueryCache.entryPrincipalTypes(QueryProcessor.this, subject);
- if(principalTypes != null) update(graph, principalTypes);
+ if(principalTypes != null) markForUpdate(graph, principalTypes);
Types types = QueryCache.entryTypes(QueryProcessor.this, subject);
- if(types != null) update(graph, types);
+ if(types != null) markForUpdate(graph, types);
}
if(predicate == subrelationOf) {
SuperRelations superRelations = SuperRelations.entry(QueryProcessor.this, subject);
- if(superRelations != null) update(graph, superRelations);
+ if(superRelations != null) markForUpdate(graph, superRelations);
}
predicates.add(subject);
@Override
public boolean execute(final int subject) {
- for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) update(graph, o);
- for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) update(graph, o);
- for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) update(graph, o);
+ for(Objects o : QueryCache.entriesObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(DirectObjects o : QueryCache.entriesDirectObjects(QueryProcessor.this, subject)) markForUpdate(graph, o);
+ for(Statements o : QueryCache.entriesStatements(QueryProcessor.this, subject)) markForUpdate(graph, o);
DirectPredicates entry = QueryCache.entryDirectPredicates(QueryProcessor.this, subject);
- if(entry != null) update(graph, entry);
+ if(entry != null) markForUpdate(graph, entry);
return true;
public boolean execute(int orderedSet) {
OrderedSet entry = QueryCache.entryOrderedSet(QueryProcessor.this, orderedSet);
- if(entry != null) update(graph, entry);
+ if(entry != null) markForUpdate(graph, entry);
return true;
});
- // for (Integer subject : predicates) {
- // DirectPredicates entry = DirectPredicates.entry(QueryProcessor.this, subject);
- // if(entry != null) update(graph, entry);
- // }
+ updateRefutations(graph);
+ primitiveUpdates.forEach(new TObjectProcedure() {
- if (Development.DEVELOPMENT) {
- if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
- System.err.println("== Query update ends ==");
+ @Override
+ public boolean execute(Object arg0) {
+
+ ExternalReadEntry query = (ExternalReadEntry)cache.externalReadEntryMap.get(arg0);
+ if (query != null) {
+ boolean listening = update(graph, query);
+ if (!listening && !query.hasParents()) {
+ cache.externalReadEntryMap.remove(arg0);
+ query.discard();
+ }
+ }
+ return true;
}
- }
+ });
+
scheduledValueUpdates.clear();
scheduledObjectUpdates.clear();
scheduledInvalidates.clear();
+
+ if (Development.DEVELOPMENT) {
+ if(Development.<Boolean>getProperty(DevelopmentKeys.QUERYPROCESSOR_UPDATE, Bindings.BOOLEAN)) {
+ System.err.println("== Query update ends ==");
+ }
+ }
}
@Override
protected void doDispose() {
+ requests.release(Integer.MAX_VALUE / 2);
+
for(int index = 0; index < THREADS; index++) {
executors[index].dispose();
}
@Override
final public void forEachPredicate(final ReadGraphImpl impl, final Resource subject, final AsyncMultiProcedure<Resource> procedure) {
- throw new UnsupportedOperationException();
+ try {
-// assert(subject != null);
-// assert(procedure != null);
-//
-// final ListenerBase listener = getListenerBase(procedure);
-//
-// IntProcedure ip = new IntProcedure() {
-//
-// AtomicBoolean first = new AtomicBoolean(true);
-//
-// @Override
-// public void execute(ReadGraphImpl graph, int i) {
-// try {
-// if(first.get()) {
-// procedure.execute(graph, querySupport.getResource(i));
-// } else {
-// procedure.execute(impl.newRestart(graph), querySupport.getResource(i));
-// }
-// } catch (Throwable t2) {
-// Logger.defaultLogError(t2);
-// }
-// }
-//
-// @Override
-// public void finished(ReadGraphImpl graph) {
-// try {
-// if(first.compareAndSet(true, false)) {
-// procedure.finished(graph);
-//// impl.state.barrier.dec(this);
-// } else {
-// procedure.finished(impl.newRestart(graph));
-// }
-//
-// } catch (Throwable t2) {
-// Logger.defaultLogError(t2);
-// }
-// }
-//
-// @Override
-// public void exception(ReadGraphImpl graph, Throwable t) {
-// try {
-// if(first.compareAndSet(true, false)) {
-// procedure.exception(graph, t);
-// } else {
-// procedure.exception(impl.newRestart(graph), t);
-// }
-// } catch (Throwable t2) {
-// Logger.defaultLogError(t2);
-// }
-// }
-//
-// };
-//
-// int sId = querySupport.getId(subject);
-//
-// try {
-// QueryCache.runnerPredicates(impl, sId, impl.parent, listener, ip);
-// } catch (DatabaseException e) {
-// Logger.defaultLogError(e);
-// }
+ for(Resource predicate : getPredicates(impl, subject))
+ procedure.execute(impl, predicate);
+
+ procedure.finished(impl);
+
+ } catch (Throwable e) {
+ procedure.exception(impl, e);
+ }
}