]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/QueryProcessor.java
Merge branch 'private/db-threads' into private/threads-testing
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / QueryProcessor.java
index a6c987a9d0e10e0b2beda36f7a571a8633527063..5945589a331c2460f83e0479f4f56f2b7425ebf2 100644 (file)
@@ -16,6 +16,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -150,7 +151,9 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        QueryThread[]                                   executors;
 
-       public ArrayList<SessionTask>[]                           queues;
+//     public ArrayList<SessionTask>[]                           queues;
+       
+       public LinkedList<SessionTask>                           freeScheduling = new LinkedList<SessionTask>();
 
        enum ThreadState {
 
@@ -159,17 +162,15 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        public ThreadState[]                                                                    threadStates;
-       public ReentrantLock[]                                                                  threadLocks;
-       public Condition[]                                                                          threadConditions;
+//     public ReentrantLock[]                                                                  threadLocks;
+//     public Condition[]                                                                          threadConditions;
 
-       public ArrayList<SessionTask>[]                           ownTasks;
+       //public ArrayList<SessionTask>[]                           ownTasks;
 
-       public ArrayList<SessionTask>[]                           ownSyncTasks;
+       //public ArrayList<SessionTask>[]                           ownSyncTasks;
 
-       ArrayList<SessionTask>[]                           delayQueues;
+       //ArrayList<SessionTask>[]                           delayQueues;
        
-       public boolean synch = true;
-
        final Object querySupportLock;
        
        public Long modificationCounter = 0L;
@@ -177,45 +178,85 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public void close() {
        }
 
-       final public void scheduleOwn(int caller, SessionTask request) {
-               ownTasks[caller].add(request);
+       SessionTask getOwnTask(int thread) {
+               synchronized(querySupportLock) {
+                       int index = 0;
+                       while(index < freeScheduling.size()) {
+                               SessionTask task = freeScheduling.get(index);
+                               if(task.thread == thread && !task.systemCall)
+                                       return freeScheduling.remove(index);
+                               index++;
+                       }
+               }
+               return null;
        }
-
-       final public void scheduleAlways(int caller, SessionTask request) {
-
-               int performer = request.thread;
-               if(caller == performer) {
-                       ownTasks[caller].add(request);
+       
+       public boolean performPending(int thread) {
+               SessionTask task = getOwnTask(thread);
+               if(task != null) {
+                       task.run(thread);
+                       return true;
                } else {
-                       schedule(caller, request);
+                       return false;
                }
-
        }
 
-       final public void schedule(int caller, SessionTask request) {
+//     final public void scheduleOwn(int caller, SessionTask request) {
+//             ownTasks[caller].add(request);
+//     }
+
+       final public void schedule(SessionTask request) {
 
                int performer = request.thread;
 
                if(DebugPolicy.SCHEDULE)
-                       System.out.println("schedule " + request + " " + caller + " -> " + performer);
+                       System.out.println("schedule " + request + " " + " -> " + performer);
 
-               assert(performer >= 0);
+               //assert(performer >= 0);
 
                assert(request != null);
 
-               if(caller == performer) {
-                       request.run(caller);
-               } else {
-                       ReentrantLock queueLock = threadLocks[performer];
-                       queueLock.lock();
-                       queues[performer].add(request);
-                       // This thread could have been sleeping
-                       if(queues[performer].size() == 1) {
-                               if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
-                               threadConditions[performer].signalAll();
-                       }
-                       queueLock.unlock();
-               }
+//             if(caller == performer) {
+//                     request.run(caller);
+//             } else {
+                       
+//                     if(performer == THREADS) {
+                               
+                               synchronized(querySupportLock) {
+                                       
+                                       //new Exception().printStackTrace();
+                                       
+                                       freeScheduling.add(request);
+                                       
+                                       querySupportLock.notifyAll();
+
+                                       //System.err.println("schedule free task " + request + " => " + freeScheduling.size());
+
+//                                     for(int i=0;i<THREADS;i++) {
+//                                             ReentrantLock queueLock = threadLocks[i];
+//                                             queueLock.lock();
+//                                             //queues[performer].add(request);
+//                                             //if(ThreadState.SLEEP == threadStates[i]) sleepers.decrementAndGet();
+//                                             threadConditions[i].signalAll();
+//                                             queueLock.unlock();
+//                                     }
+
+                               }
+
+                               return;
+                               
+//                     }
+//                     
+//                     ReentrantLock queueLock = threadLocks[performer];
+//                     queueLock.lock();
+//                     queues[performer].add(request);
+//                     // This thread could have been sleeping
+//                     if(queues[performer].size() == 1) {
+//                             //if(ThreadState.SLEEP == threadStates[performer]) sleepers.decrementAndGet();
+//                             threadConditions[performer].signalAll();
+//                     }
+//                     queueLock.unlock();
+//             }
 
        }
 
@@ -228,26 +269,28 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public static abstract class SessionTask {
 
                final public int thread;
-               final public int syncCaller;
-               final public Object object;
-
-               public SessionTask(WriteTraits object, int thread) {
-                       this.thread = thread;
-                       this.syncCaller = -1;
-                       this.object = object;
+               final public boolean systemCall;
+//             final public int syncCaller;
+               //final public Object object;
+
+               public SessionTask(boolean systemCall) {
+                       this.thread = QueryProcessor.thread.get();
+                       this.systemCall = systemCall;
+//                     this.syncCaller = -1;
+                       //this.object = object;
                }
 
-               public SessionTask(Object object, int thread, int syncCaller) {
-                       this.thread = thread;
-                       this.syncCaller = syncCaller;
-                       this.object = object;
-               }
+//             public SessionTask(Object object, int syncCaller) {
+//                     this.thread = QueryProcessor.thread.get();
+//                     this.syncCaller = syncCaller;
+//                     this.object = object;
+//             }
 
                public abstract void run(int thread);
 
                @Override
                public String toString() {
-                       return "SessionTask[" + object + "]";
+                       return "SessionTask[" + super.toString() + "]";
                }
 
        }
@@ -257,14 +300,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                final public Semaphore notify;
                final public DataContainer<Throwable> throwable; 
 
-               public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread) {
-                       super(object, thread, thread);
-                       this.throwable = throwable;
-                       this.notify = notify;
-               }
-
-               public SessionRead(Object object, DataContainer<Throwable> throwable, Semaphore notify, int thread, int syncThread) {
-                       super(object, thread, syncThread);
+               public SessionRead(DataContainer<Throwable> throwable, Semaphore notify) {
+                       super(true);
                        this.throwable = throwable;
                        this.notify = notify;
                }
@@ -279,10 +316,29 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        public boolean resume(ReadGraphImpl graph) {
                return executors[0].runSynchronized();
        }
+       
+       //private WeakReference<GarbageTracker> garbageTracker;
+       
+       private class GarbageTracker    {
+               
+               @Override
+               protected void finalize() throws Throwable {
+                       
+//                     System.err.println("GarbageTracker");
+//                     
+//                     garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
+                       
+                       super.finalize();
+                       
+               }
+               
+       }
 
        public QueryProcessor(final int threads, QuerySupport core, Set<Thread> threadSet)
                        throws DatabaseException {
 
+               //garbageTracker = new WeakReference<GarbageTracker>(new GarbageTracker());
+               
                THREADS = threads;
                THREAD_MASK = threads - 1;
 
@@ -293,28 +349,28 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                querySupportLock = core.getLock();
 
                executors = new QueryThread[THREADS];
-               queues = new ArrayList[THREADS];
-               threadLocks = new ReentrantLock[THREADS];
-               threadConditions = new Condition[THREADS];
+//             queues = new ArrayList[THREADS];
+//             threadLocks = new ReentrantLock[THREADS];
+//             threadConditions = new Condition[THREADS];
                threadStates = new ThreadState[THREADS];
-               ownTasks = new ArrayList[THREADS];
-               ownSyncTasks = new ArrayList[THREADS];
-               delayQueues = new ArrayList[THREADS * THREADS];
+//             ownTasks = new ArrayList[THREADS];
+//             ownSyncTasks = new ArrayList[THREADS];
+//             delayQueues = new ArrayList[THREADS * THREADS];
 
                //        freeSchedule = new AtomicInteger(0);
 
-               for (int i = 0; i < THREADS * THREADS; i++) {
-                       delayQueues[i] = new ArrayList<SessionTask>();
-               }
+//             for (int i = 0; i < THREADS * THREADS; i++) {
+//                     delayQueues[i] = new ArrayList<SessionTask>();
+//             }
 
                for (int i = 0; i < THREADS; i++) {
 
                        //            tasks[i] = new ArrayList<Runnable>();
-                       ownTasks[i] = new ArrayList<SessionTask>();
-                       ownSyncTasks[i] = new ArrayList<SessionTask>();
-                       queues[i] = new ArrayList<SessionTask>();
-                       threadLocks[i] = new ReentrantLock();
-                       threadConditions[i] = threadLocks[i].newCondition();
+//                     ownTasks[i] = new ArrayList<SessionTask>();
+//                     ownSyncTasks[i] = new ArrayList<SessionTask>();
+//                     queues[i] = new ArrayList<SessionTask>();
+//                     threadLocks[i] = new ReentrantLock();
+//                     threadConditions[i] = threadLocks[i].newCondition();
                        //            limits[i] = false;
                        threadStates[i] = ThreadState.INIT;
 
@@ -552,7 +608,17 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                        });
                } catch (DatabaseException e) {
-                       Logger.defaultLogError(e);
+                   
+                   try {
+                       
+                procedure.exception(graph, e);
+                
+            } catch (DatabaseException e1) {
+                
+                Logger.defaultLogError(e1);
+                
+            }
+                   
                }
 
        }
@@ -1340,7 +1406,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                CacheEntry entry = e.entry;
 
-               System.err.println("updateQuery " + entry);
+               //System.err.println("updateQuery " + entry);
                
                /*
                 * If the dependency graph forms a DAG, some entries are inserted in the
@@ -2324,11 +2390,8 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        
        @Override
        final public IntSet getPredicates(final ReadGraphImpl impl, final Resource subject) throws Throwable {
-               
-               return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null, null); 
-
+               return QueryCacheBase.resultPredicates(impl, querySupport.getId(subject), impl.parent, null); 
        }
-       
 
        @Override
        final public void forEachStatement(final ReadGraphImpl impl, final Resource subject,
@@ -3424,7 +3487,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
                assert(subject != null);
                
-               return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null, null);
+               return QueryCacheBase.resultTypes(impl, querySupport.getId(subject), impl.parent, null);
 
        }
 
@@ -3647,17 +3710,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
        }
 
        final public byte[] getValue(final ReadGraphImpl impl, final int subject) throws DatabaseException {
-               return QueryCache.resultValueQuery(impl, subject, impl.parent, null, null); 
+               return QueryCache.resultValueQuery(impl, subject, impl.parent, null); 
        }
 
        @Override
-       final public byte[] forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
+       final public void forValue(final ReadGraphImpl impl, final Resource subject, final AsyncProcedure<byte[]> procedure) {
 
                assert(subject != null);
+               assert(procedure != null);
 
                int sId = querySupport.getId(subject);
 
-               if(procedure != null) {
+//             if(procedure != null) {
                
                        final ListenerBase listener = getListenerBase(procedure);
 
@@ -3699,23 +3763,18 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 //                     else impl.state.barrier.inc(null, null);
 
                        try {
-                               return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, listener, ip);
+                               QueryCache.runnerValueQuery(impl, sId, impl.parent, listener, ip);
                        } catch (DatabaseException e) {
-                               Logger.defaultLogError(e);
-                       } 
-                       
-
-               } else {
+                               throw new IllegalStateException("Internal error");
+                       }
 
-                       try {
-                               return QueryCacheBase.resultValueQuery(impl, sId, impl.parent, null, null);
-                       } catch (DatabaseException e) {
-                               Logger.defaultLogError(e);
-                       } 
-                       
-               }
-               
-               throw new IllegalStateException("Internal error");
+//             } else {
+//
+//                     return QueryCacheBase.runnerValueQuery(impl, sId, impl.parent, null, null);
+//                     
+//             }
+//             
+//             throw new IllegalStateException("Internal error");
 
        }
 
@@ -3973,7 +4032,7 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                final ListenerBase listener = getListenerBase(procedure);
 
                try {
-                       IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener,QueryCache.emptyProcedureDirectPredicates);
+                       IntSet result = QueryCache.resultDirectPredicates(impl, querySupport.getId(subject), impl.parent, listener);
                        procedure.execute(impl, !result.isEmpty());
                } catch (DatabaseException e) {
                        procedure.exception(impl, e);
@@ -4323,48 +4382,48 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
 
        }
 
-       @Override
-       final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) {
-
-               assert(request != null);
-               assert(procedure != null);
-
-               try {
-               
-                       queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
-       
-                               @Override
-                               public String toString() {
-                                       return procedure.toString();
-                               }
-       
-                               @Override
-                               public void execute(AsyncReadGraph graph, T result) {
-                                       try {
-                                               procedure.execute(result);
-                                       } catch (Throwable t2) {
-                                               Logger.defaultLogError(t2);
-                                       }
-                               }
-
-                               @Override
-                               public void exception(AsyncReadGraph graph, Throwable throwable) {
-                                       try {
-                                               procedure.exception(throwable);
-                                       } catch (Throwable t2) {
-                                               Logger.defaultLogError(t2);
-                                       }
-                               }
-       
-                       });
-                       
-               } catch (DatabaseException e) {
-                       
-                       throw new IllegalStateException(e);
-                       
-               }
-
-       }
+//     @Override
+//     final public <T> void query(final ReadGraphImpl impl, final ExternalRead<T> request, final CacheEntry parent, final Procedure<T> procedure, ListenerBase listener) throws DatabaseException {
+//
+//             assert(request != null);
+//             assert(procedure != null);
+//
+//             try {
+//             
+//                     queryPrimitiveRead(impl, request, parent, listener, new AsyncProcedure<T>() {
+//     
+//                             @Override
+//                             public String toString() {
+//                                     return procedure.toString();
+//                             }
+//     
+//                             @Override
+//                             public void execute(AsyncReadGraph graph, T result) {
+//                                     try {
+//                                             procedure.execute(result);
+//                                     } catch (Throwable t2) {
+//                                             Logger.defaultLogError(t2);
+//                                     }
+//                             }
+//
+//                             @Override
+//                             public void exception(AsyncReadGraph graph, Throwable throwable) {
+//                                     try {
+//                                             procedure.exception(throwable);
+//                                     } catch (Throwable t2) {
+//                                             Logger.defaultLogError(t2);
+//                                     }
+//                             }
+//     
+//                     });
+//                     
+//             } catch (DatabaseException e) {
+//                     
+//                     throw new IllegalStateException(e);
+//                     
+//             }
+//
+//     }
 
        @Override
        public VirtualGraph getProvider(Resource subject, Resource predicate, Resource object) {
@@ -4410,5 +4469,11 @@ final public class QueryProcessor extends AbstractDisposable implements ReadGrap
                }
                return L0;
        }
+
+    public static ThreadLocal<Integer> thread = new ThreadLocal<Integer>() {
+        protected Integer initialValue() {
+            return -1;
+        }
+    };
        
 }