]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/SuperRelations.java
Multiple reader thread support for db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / SuperRelations.java
index e76f702e14f54ca8b1a379ba255ca8d4e69f229c..1f540062e9440d2db81164ee820c05566f4a55c0 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
  *******************************************************************************/
 package org.simantics.db.impl.query;
 
-import gnu.trove.procedure.TIntProcedure;
-import gnu.trove.set.hash.TIntHashSet;
-
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.simantics.db.common.utils.Logger;
+import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.impl.procedure.InternalProcedure;
-import org.simantics.db.procedure.ListenerBase;
-
-final public class SuperRelations extends UnaryQuery<InternalProcedure<IntSet>> {
-       
-//     public ArrayList<InternalProcedure<IntSet>> procs = null;
-    
-    private SuperRelations(final int resource) {
-        super(resource);
-    }
-    
-    final static SuperRelations entry(final QueryProcessor provider, final int r) {
-        
-        return (SuperRelations)provider.superRelationsMap.get(r);
-
-    }
 
-    final static IntSet runner(ReadGraphImpl graph, final int r, final QueryProcessor provider, final CacheEntry parent, final ListenerBase listener, final InternalProcedure<IntSet> procedure) {
-
-        SuperRelations entry = (SuperRelations)provider.superRelationsMap.get(r);
-        if(entry == null) {
-
-               entry = new SuperRelations(r);
-               entry.setPending();
-               entry.clearResult(provider.querySupport);
-               entry.putEntry(provider);
-               
-               return (IntSet)provider.performForEach(graph, entry, parent, listener, procedure);
-            
-        } else {
-               
-            if(!entry.isReady()) {
-                throw new IllegalStateException();
-            }
-            return (IntSet)provider.performForEach(graph, entry, parent, listener, procedure);
-            
-        }
-
-    }
-    
-    final static IntSet runner2(ReadGraphImpl graph, final int r, final QueryProcessor provider, final CacheEntry parent, final ListenerBase listener, final InternalProcedure<IntSet> procedure) throws Throwable {
+import gnu.trove.procedure.TIntProcedure;
+import gnu.trove.set.hash.TIntHashSet;
 
-        SuperRelations entry = (SuperRelations)provider.superRelationsMap.get(r);
-        if(entry == null) {
+public final class SuperRelations extends UnaryQueryP<IntSet> {
 
-            entry = new SuperRelations(r);
-            entry.setPending();
-            entry.clearResult(provider.querySupport);
-            entry.putEntry(provider);
-            
-            return (IntSet)provider.performForEach2(graph, entry, parent, listener, procedure);
-            
-        } else {
-            
-            if(!entry.isReady()) {
-                throw new IllegalStateException();
-            }
-            return (IntSet)provider.performForEach2(graph, entry, parent, listener, procedure);
-            
-        }
-
-    }    
-    
-    final public static void queryEach(ReadGraphImpl graph, final int r, final QueryProcessor provider, final CacheEntry parent, final ListenerBase listener, final InternalProcedure<IntSet> procedure) {
-
-        if(parent == null && listener == null) {
-               SuperRelations entry = (SuperRelations)provider.superRelationsMap.get(r);
-               if(entry != null && entry.isReady()) {
-                   entry.performFromCache(graph, provider, procedure);
-                   return;
-               }
-        }
-        
-        runner(graph, r, provider, parent, listener, procedure);
-         
+    SuperRelations(final int resource) {
+        super(resource);
     }
 
-    final public static IntSet queryEach2(ReadGraphImpl graph, final int r, final QueryProcessor provider, final CacheEntry parent, final ListenerBase listener, final InternalProcedure<IntSet> procedure) throws Throwable {
-
-        if(parent == null && listener == null) {
-            SuperRelations entry = (SuperRelations)provider.superRelationsMap.get(r);
-            if(entry != null && entry.isReady()) {
-                return (IntSet)entry.get(graph, provider, procedure);
-            }
-        }
-        
-        return runner2(graph, r, provider, parent, listener, procedure);
-         
+    static final SuperRelations entry(final QueryProcessor provider, final int r) {
+        return (SuperRelations)provider.cache.superRelationsMap.get(r);
     }
-    
-       @Override
-       public UnaryQuery<InternalProcedure<IntSet>> getEntry(QueryProcessor provider) {
-        return provider.superRelationsMap.get(id);
-       }
-       
-       @Override
-       public void putEntry(QueryProcessor provider) {
-        provider.superRelationsMap.put(id, this);
-       }
 
        @Override
-       final public void removeEntry(QueryProcessor provider) {
-               provider.superRelationsMap.remove(id);
+       public final void removeEntry(QueryProcessor provider) {
+               provider.cache.remove(this);
        }
        
-       static int histoCounter = 0;
-       static IntSet EMPTY_SET = new IntSet();
-    static int counter = 0;
-    
-    class Koss {
+    static class Ints {
         
         private TIntHashSet set = null;
         public int single = 0;
@@ -157,67 +66,75 @@ final public class SuperRelations extends UnaryQuery<InternalProcedure<IntSet>>
         
     }
 
-       @Override
-       public Object computeForEach(final ReadGraphImpl graph, final QueryProcessor provider, final InternalProcedure<IntSet> procedure, final boolean store) {
+       public void compute(final ReadGraphImpl graph, final InternalProcedure<IntSet> procedure) throws DatabaseException {
+           computeForEach(graph, id, this, procedure);
+       }
 
-               provider.querySupport.ensureLoaded(graph, id);
+    static class DirectProcedure extends Ints implements IntProcedure, TIntProcedure, InternalProcedure<IntSet> {
+        
+        IntSet result;
+        InternalProcedure<IntSet> proc;
+        
+        public DirectProcedure(IntSet result, InternalProcedure<IntSet> proc) {
+            this.result = result;
+            this.proc = proc;
+        }
+        
+        @Override
+        final public boolean execute(int r) {
+            result.add(r);
+            return true;
+        }
+        @Override
+        final public void execute(ReadGraphImpl graph, int r) {
+            if(single == 0) {
+                single = r;
+                return;
+            }
+            add(r);
+        }
+        @Override
+        public final void execute(ReadGraphImpl graph, IntSet set) throws DatabaseException {
+            set.forEach(this);
+            proc.execute(graph, result);
+        }
+        @Override
+        public void finished(ReadGraphImpl graph) {
+        }
+        @Override
+        public void exception(ReadGraphImpl graph, Throwable t) {
+            throw new Error("Errors are not supported.", t);
+        }
+
+    }
+
+       public static Object computeForEach(final ReadGraphImpl graph, int id, SuperRelations entry, final InternalProcedure<IntSet> procedure_) throws DatabaseException {
+           
+           InternalProcedure<IntSet> procedure = entry != null ? entry : procedure_;
+
+           QueryProcessor processor = graph.processor;
+               
+               processor.querySupport.ensureLoaded(graph, id);
                
            final InternalProcedure<IntSet> proc = (InternalProcedure<IntSet>)procedure;
 
-           final int subrelationOf = provider.getSubrelationOf();
-
-           final IntSet result = new IntSet(provider.querySupport);
-
-           final class DirectProcedure extends Koss implements IntProcedure, TIntProcedure, InternalProcedure<IntSet> {
-               @Override
-               final public boolean execute(int r) {
-                       result.add(r);
-                       return true;
-               }
-               @Override
-               final public void execute(ReadGraphImpl graph, int r) {
-                       if(single == 0) {
-                               single = r;
-                               return;
-                       }
-                       add(r);
-               }
-               @Override
-               final public void execute(ReadGraphImpl graph, IntSet set) {
-                       set.forEach(this);
-                       addOrSet(graph, result, provider);
-                       proc.execute(graph, result);
-               }
-               @Override
-               public void finished(ReadGraphImpl graph) {
-               }
-               @Override
-               public void exception(ReadGraphImpl graph, Throwable t) {
-                       throw new Error("Errors are not supported.", t);
-               }
+           final int subrelationOf = processor.getSubrelationOf();
 
-           }
+           final IntSet result = new IntSet(processor.querySupport);
 
-           final DirectProcedure directProc = new DirectProcedure();
+           final DirectProcedure directProc = new DirectProcedure(result, proc);
 
-           provider.querySupport.getObjects(graph, id, subrelationOf, directProc);
+          processor.querySupport.getObjects(graph, id, subrelationOf, directProc);
 
            int size = directProc.size();
 
            if(size == 0) {
-
-               addOrSet(graph, EMPTY_SET, provider);
-               proc.execute(graph, EMPTY_SET);
-
+               proc.execute(graph, IntSet.EMPTY);
            } else if (size == 1) {
-
                result.add(directProc.single);
-               SuperRelations.queryEach(graph, directProc.single, provider, SuperRelations.this, null, directProc);
-
+               QueryCache.runnerSuperRelations(graph, directProc.single, entry, null, directProc);
            } else {
 
-               //                  if((counter++ % 500) == 0) System.out.println("SR " + counter);
-
                final TIntProcedure addToResult = new TIntProcedure() {
                        @Override
                        public boolean execute(int r) {
@@ -234,26 +151,34 @@ final public class SuperRelations extends UnaryQuery<InternalProcedure<IntSet>>
 
                        @Override
                        public boolean execute(int arg0) {
+                               try {
+                                               return execute0(arg0);
+                                       } catch (DatabaseException e) {
+                                               Logger.defaultLogError(e);
+                                       }
+                               return false;
+                       }
+
+                       public boolean execute0(int arg0) throws DatabaseException {
 
                                synchronized(result) {
                                        result.add(arg0);
                                }
 
-                               SuperRelations.queryEach(graph, arg0, provider, SuperRelations.this, null, new InternalProcedure<IntSet>() {
+                               QueryCache.runnerSuperRelations(graph, arg0, entry, null, new InternalProcedure<IntSet>() {
 
                                        @Override
-                                       public void execute(ReadGraphImpl graph, IntSet set) {
+                                       public void execute(ReadGraphImpl graph, IntSet set) throws DatabaseException {
                                                set.forEach(addToResult);
                                                int current = finishes.addAndGet(1);
                                                if(current == directProc.size()) {
-                                                       addOrSet(graph, result, provider);
                                                        proc.execute(graph, result);
                                                        return; 
                                                }
                                        }
 
                                        @Override
-                                       public void exception(ReadGraphImpl graph, Throwable t) {
+                                       public void exception(ReadGraphImpl graph, Throwable t) throws DatabaseException {
                                                proc.exception(graph, t);
                                        }
 
@@ -267,6 +192,8 @@ final public class SuperRelations extends UnaryQuery<InternalProcedure<IntSet>>
 
            }
            
+           if(entry != null) entry.performFromCache(graph, procedure_);
+            
            return result;
         
     }
@@ -275,77 +202,5 @@ final public class SuperRelations extends UnaryQuery<InternalProcedure<IntSet>>
     public String toString() {
        return "SuperRelations[" + id + "]";
     }
-
-    private void addOrSet(ReadGraphImpl graph, final IntSet value, QueryProcessor provider) {
-
-        assert(!isReady());
-
-//        ArrayList<InternalProcedure<IntSet>> p = null;
-
-        synchronized(this) {
-        
-            value.trim();
-            setResult(value);
-            setReady();
-//            p = procs;
-//            procs = null; 
-        
-        }
-
-//        if(p != null) {
-//             IntSet v = (IntSet)getResult();
-//             if(v != null) {
-//                 for(InternalProcedure<IntSet> proc : p) proc.execute(graph, v);
-//             }
-//        }
-        
-    }
-    
-    @Override
-    public Object performFromCache(ReadGraphImpl graph, QueryProcessor provider, InternalProcedure<IntSet> procedure) {
-              
-        assert(isReady());
-
-       if(handleException(graph, procedure)) return null;
-        
-       IntSet result = getResult();
-       
-        procedure.execute(graph, result);
-        
-        return result;
-        
-    }
-
-    @Override
-    public void recompute(ReadGraphImpl graph, QueryProcessor provider) {
-        
-        final Semaphore s = new Semaphore(0);
-
-        computeForEach(graph, provider, new InternalProcedure<IntSet>() {
-
-               @Override
-               public void execute(ReadGraphImpl graph, IntSet result) {
-                s.release();
-               }
-
-            @Override
-            public void exception(ReadGraphImpl graph, Throwable t) {
-               s.release();
-                new Error("Error in recompute.", t).printStackTrace();
-            }
-
-        }, true);
-
-        while(!s.tryAcquire()) {
-               provider.resume(graph);
-        }
-        
-    }
-    
-
-    @Override
-    boolean isImmutable(ReadGraphImpl graph) {
-       return graph.processor.isImmutable(id);
-    }
     
 }