]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.impl/src/org/simantics/db/impl/query/AsyncMultiReadEntry.java
Multiple reader thread support for db client
[simantics/platform.git] / bundles / org.simantics.db.impl / src / org / simantics / db / impl / query / AsyncMultiReadEntry.java
index 8a8b18b7dda5641113207102d8c43f54c5608716..24f8e5375f65660a41e08396e97a393b5ce4b5b1 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -14,19 +14,22 @@ package org.simantics.db.impl.query;
 import java.util.ArrayList;
 
 import org.simantics.db.AsyncReadGraph;
-import org.simantics.db.common.exception.DebugException;
+import org.simantics.db.exception.DatabaseException;
+import org.simantics.db.impl.BlockingAsyncMultiProcedure;
 import org.simantics.db.impl.graph.ReadGraphImpl;
 import org.simantics.db.procedure.AsyncMultiProcedure;
 import org.simantics.db.request.AsyncMultiRead;
 import org.simantics.db.request.RequestFlags;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-final public class AsyncMultiReadEntry<T> extends CacheEntryBase {
+final public class AsyncMultiReadEntry<T> extends CacheEntryBase<AsyncMultiProcedure<T>> {
 
-//    public ArrayList<AsyncMultiProcedure<T>> procs = null;
+    private static final Logger LOGGER = LoggerFactory.getLogger(AsyncMultiReadEntry.class);
 
     protected AsyncMultiRead<T> request;
     
-    public AsyncMultiReadEntry(AsyncMultiRead<T> request) {
+    AsyncMultiReadEntry(AsyncMultiRead<T> request) {
        this.request = request;
     }
     
@@ -49,58 +52,26 @@ final public class AsyncMultiReadEntry<T> extends CacheEntryBase {
     
     final synchronized public void finish(AsyncReadGraph graph) {
        
-//     new Exception("finish " + this).printStackTrace();
-       
-       if(!isPending()) {
-               System.err.println("aff");
-       }
-       
        assert(isPending());
 
-//        ArrayList<AsyncMultiProcedure<T>> p = null;
-
         synchronized(this) {
-
                setReady();
-//            p = procs;
-//            procs = null; 
-        
         }
         
-//        if(p != null) {
-//             ArrayList<T> v = (ArrayList<T>)getResult();
-//             if(v != null) {
-//                 for(AsyncMultiProcedure<T> proc : p) {
-//                     for(T t : v) proc.execute(graph, t);
-//                 }
-//             }
-//             
-//             for(AsyncMultiProcedure<T>  proc : p) proc.finished(graph);
-//        }
-        
     }
 
     final synchronized public void except(AsyncReadGraph graph, Throwable t) {
 
        assert(isPending());
 
-//        ArrayList<AsyncMultiProcedure<T>> p = null;
-
         synchronized(this) {
-
                except(t);
-//            p = procs;
-//            procs = null; 
-        
         }
         
-//        if(p != null) {
-//             for(AsyncMultiProcedure<T>  proc : p) proc.exception(graph, t);
-//        }
-        
     }
 
-    final synchronized public void addOrSet(Object item) {
+    @SuppressWarnings("unchecked")
+       final synchronized public void addOrSet(Object item) {
 
        assert(isPending());
        
@@ -120,42 +91,43 @@ final public class AsyncMultiReadEntry<T> extends CacheEntryBase {
         return new Query() {
 
                        @Override
-                       public void recompute(ReadGraphImpl graph, Object provider, CacheEntry entry) {
-                               
-                               QueryProcessor qp = (QueryProcessor)provider;
-
-                               final ReadGraphImpl parentGraph = ReadGraphImpl.forRecompute(entry, qp); 
+                       public void recompute(ReadGraphImpl graph) {
 
                                try {
 
-                                   request.perform(parentGraph , new AsyncMultiProcedure<T>() {
+                                       BlockingAsyncMultiProcedure<T> proc = new BlockingAsyncMultiProcedure<>(graph, new AsyncMultiProcedure<T>() {
 
-                        @Override
-                        public void execute(AsyncReadGraph graph, T result) {
-                            addOrSet(result);
-                        }
-                        
-                        public void finished(AsyncReadGraph graph) {
-                               finish(graph);
-                        };
-                                       
-                                       @Override
-                                       public void exception(AsyncReadGraph graph, Throwable t) {
-                            except(t);
-                           }
+                                               @Override
+                                               public void execute(AsyncReadGraph graph, T result) {
+                                                       addOrSet(result);
+                                               }
 
-                    });
+                                               public void finished(AsyncReadGraph graph) {
+                                                       finish(graph);
+                                               };
+
+                                               @Override
+                                               public void exception(AsyncReadGraph graph, Throwable t) {
+                                                       except(t);
+                                               }
+
+                                       }, request);
+
+                                       request.perform(graph , proc);
+
+                                       proc.get();
 
                                } catch (Throwable t) {
-                    except(t);
-                    if(DebugException.DEBUG) new DebugException(t).printStackTrace();
-                }
+                                       
+                                       except(t);
+                                       
+                               }
                                
                        }
 
                        @Override
                        public void removeEntry(QueryProcessor processor) {
-                       processor.asyncMultiReadMap.remove(request);
+                       processor.cache.remove(AsyncMultiReadEntry.this);
                        }
 
                        @Override
@@ -173,52 +145,40 @@ final public class AsyncMultiReadEntry<T> extends CacheEntryBase {
         
     }
 
+       @SuppressWarnings("unchecked")
        @Override
-       public void performFromCache(ReadGraphImpl graph, Object provider, Object procedure) {
-               
-               final AsyncMultiProcedure<T> proc = (AsyncMultiProcedure<T>)procedure;
+       public Object performFromCache(ReadGraphImpl graph, AsyncMultiProcedure<T> proc) {
 
         if(isExcepted()) {
 
             try {
                 proc.exception(graph, (Throwable)getResult());
             } catch (Throwable t) {
-                t.printStackTrace();
+                LOGGER.error("performFromCache proc.exception failed", t);
             }
-            
-            
+
         } else {
-            
+
             final ArrayList<T> values = (ArrayList<T>)getResult();
             for(T value : values) {
                 try {
                     proc.execute(graph, value);
                 } catch (Throwable t) {
-                    t.printStackTrace();
+                    LOGGER.error("performFromCache proc.execute failed", t);
                 }
             }
 
             try {
                 proc.finished(graph);
             } catch (Throwable t) {
-                t.printStackTrace();
+                LOGGER.error("performFromCache proc.finished failed", t);
             }
 
         }
                
-               
+               return getResult();
                
        }
-
-//     @Override
-//     public void performFromCache(int callerThread, Object provider,
-//                     Object procedure) {
-//
-//         QueryProvider2 queryProvider = (QueryProvider2)provider;
-//         ReadGraphImpl graph = ReadGraphImpl.forFromCache(callerThread, null, new ReadGraphSupportImpl(null, queryProvider, null));
-//             performFromCache(graph, provider, procedure);
-//             
-//     }
        
        @Override
        public String toString() {
@@ -226,4 +186,8 @@ final public class AsyncMultiReadEntry<T> extends CacheEntryBase {
                else return request.toString() + statusOrException;
        }
 
+       public Object compute(ReadGraphImpl graph, AsyncMultiProcedure<T> procedure) throws DatabaseException {
+               return graph.processor.cache.performQuery(graph, request, this, procedure);
+       }
+
 }