]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.common/src/org/simantics/db/common/processor/MergingGraphRequestProcessor.java
Direct and synchronization-free access to Layer0 resource class for DB
[simantics/platform.git] / bundles / org.simantics.db.common / src / org / simantics / db / common / processor / MergingGraphRequestProcessor.java
index 5e07bb034ecb924adc8567f99f09990e141e69ed..dfb5780f96da2edfe4e8f7079c20812a8dfd051f 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -19,20 +19,20 @@ import java.util.LinkedList;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Semaphore;
+import java.util.function.Consumer;
 
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.AsyncRequestProcessor;
 import org.simantics.db.ReadGraph;
-import org.simantics.db.RequestProcessor;
 import org.simantics.db.Resource;
 import org.simantics.db.Session;
 import org.simantics.db.WriteGraph;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
 import org.simantics.db.common.request.ReadRequest;
 import org.simantics.db.common.request.WriteRequest;
-import org.simantics.db.common.utils.Logger;
 import org.simantics.db.exception.CancelTransactionException;
 import org.simantics.db.exception.DatabaseException;
 import org.simantics.db.procedure.AsyncListener;
@@ -61,10 +61,13 @@ import org.simantics.db.request.WriteOnly;
 import org.simantics.db.request.WriteOnlyResult;
 import org.simantics.db.request.WriteResult;
 import org.simantics.utils.DataContainer;
-import org.simantics.utils.datastructures.Callback;
 import org.simantics.utils.datastructures.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-public class MergingGraphRequestProcessor implements RequestProcessor {
+public class MergingGraphRequestProcessor implements AsyncRequestProcessor {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MergingGraphRequestProcessor.class);
 
     private static class SyncWriteRequestAdapter implements Write {
 
@@ -128,7 +131,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
             try {
                 semaphore.acquire();
             } catch (InterruptedException e) {
-                       Logger.defaultLogError(e);
+                LOGGER.error("SyncWriteRequestAdapter interrupted", e);
             }
         }
 
@@ -223,7 +226,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
                             try {
                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
                             } catch (InterruptedException e) {
-                                       Logger.defaultLogError(e);
+                                LOGGER.error("MergedRead interrupted", e);
                             }
                             if (requestQueue.isEmpty())
                                 break;
@@ -262,13 +265,13 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
                     } catch(Throwable t) {
 
-                               Logger.defaultLogError(t);
+                        LOGGER.error("MergedRead failed", t);
 
-                        if(currentRequest.second instanceof AsyncProcedure<?>) {
-                            ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
-                        } else {
-                            ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
-                        }
+//                        if(currentRequest.second instanceof AsyncProcedure<?>) {
+//                            ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
+//                        } else {
+//                            ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
+//                        }
 
                     }
 
@@ -281,30 +284,30 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
                 } else {
 
-                    try{
+                    try {
 
                         if(currentRequest.second instanceof AsyncProcedure<?>) {
                             if(currentRequest.first instanceof AsyncRead) {
                                 AsyncRead req = (AsyncRead)currentRequest.first;
-                                graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
+                                graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
                             } else {
                                 Read req = (Read)currentRequest.first;
-                                graph.asyncRequest(req, (AsyncProcedure<?>)currentRequest.second);
+                                graph.syncRequest(req, (AsyncProcedure<?>)currentRequest.second);
                             }
                         } else {
                             AsyncMultiRead req = (AsyncMultiRead)currentRequest.first;
-                            graph.asyncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
+                            graph.syncRequest(req, (AsyncMultiProcedure<?>)currentRequest.second);
                         }
 
                     } catch(Throwable t) {
 
-                               Logger.defaultLogError(t);
+                        LOGGER.error("MergedRead failed", t);
 
-                        if(currentRequest.second instanceof AsyncProcedure<?>) {
-                            ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
-                        } else {
-                            ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
-                        }
+//                        if(currentRequest.second instanceof AsyncProcedure<?>) {
+//                            ((AsyncProcedure<?>)currentRequest.second).exception(graph, t);
+//                        } else {
+//                            ((AsyncMultiProcedure<?>)currentRequest.second).exception(graph, t);
+//                        }
 
                     }
                 }
@@ -355,7 +358,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
                             try {
                                 MergingGraphRequestProcessor.this.wait(transactionKeepalivePeriod);
                             } catch (InterruptedException e) {
-                                       Logger.defaultLogError(e);
+                                LOGGER.error("RunnerWriteGraphRequest interrupted", e);
                             }
                             if (requestQueue.isEmpty())
                                 break;
@@ -374,7 +377,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
                 }
 
                 @SuppressWarnings("unchecked")
-                Callback<Throwable> callback = (Callback<Throwable>)currentRequest.second;
+                Consumer<Throwable> callback = (Consumer<Throwable>)currentRequest.second;
 
                 if (currentRequest.first.getClass().equals(SyncWriteRequestAdapter.class)) {
 
@@ -383,10 +386,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
                     try {
 //                        System.out.println("merg.sync " + adapter);
                         graph.syncRequest(adapter);
-                        if(callback != null) callback.run(null);
+                        if(callback != null) callback.accept(null);
                     } catch(Throwable t) {
-                               Logger.defaultLogError(t);
-                        if(callback != null) callback.run(t);
+                        LOGGER.error("RunnerWriteGraphRequest failed", t);
+                        if(callback != null) callback.accept(t);
                     }
 
                     adapter.release();
@@ -397,10 +400,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
                     try {
                         if(currentRequest.first instanceof Write) graph.syncRequest((Write)currentRequest.first); 
                         else if(currentRequest.first instanceof DelayedWrite) graph.syncRequest((DelayedWrite)currentRequest.first); 
-                        if(callback != null) callback.run(null);
+                        if(callback != null) callback.accept(null);
                     } catch(Throwable t) {
-                               Logger.defaultLogError(t);
-                        if(callback != null) callback.run(t);
+                        LOGGER.error("RunnerWriteGraphRequest failed", t);
+                        if(callback != null) callback.accept(t);
                     }
 
                 }
@@ -443,10 +446,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
 //        System.out.println(this + " asyncRequest(ReadGraphRequest<QueryProcedure4<T>> request, QueryProcedure4<T> procedure)");
 
-        if (requestSet.contains(request))
+        Pair<Object, Object> pair = Pair.make(request, procedure);
+        if (requestSet.contains(pair))
             return;
 
-        Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
         requestQueue.add(pair);
         requestSet.add(pair);
 
@@ -464,10 +467,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
 //        System.out.println(this + " asyncRequest(ReadGraphRequest<SingleQueryProcedure4<T>> request, SingleQueryProcedure4<T> procedure) " + this);
 
-        if (requestSet.contains(request))
+        Pair<Object, Object> pair = Pair.make(request, procedure);
+        if (requestSet.contains(pair))
             return;
 
-        Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
         requestQueue.add(pair);
         requestSet.add(pair);
 
@@ -482,14 +485,14 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
     }
 
     @Override
-    public synchronized void asyncRequest(Write request, Callback<DatabaseException> callback) {
+    public synchronized void asyncRequest(Write request, Consumer<DatabaseException> callback) {
 
 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
 
-        if (requestSet.contains(request))
+        Pair<Object, Object> pair = Pair.make(request, callback);
+        if (requestSet.contains(pair))
             return;
 
-        Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
         requestQueue.add(pair);
         requestSet.add(pair);
 
@@ -505,14 +508,14 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
     }
 
     @Override
-    public synchronized void asyncRequest(DelayedWrite request, Callback<DatabaseException> callback) {
+    public synchronized void asyncRequest(DelayedWrite request, Consumer<DatabaseException> callback) {
 
 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
 
-        if (requestSet.contains(request))
+        Pair<Object, Object> pair = Pair.make(request, callback);
+        if (requestSet.contains(pair))
             return;
 
-        Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
         requestQueue.add(pair);
         requestSet.add(pair);
 
@@ -528,14 +531,14 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
     }
 
     @Override
-    public synchronized void asyncRequest(WriteOnly request, Callback<DatabaseException> callback) {
+    public synchronized void asyncRequest(WriteOnly request, Consumer<DatabaseException> callback) {
 
 //        System.out.println(this + " asyncRequest(WriteGraphRequest request)");
 
-        if (requestSet.contains(request))
+        Pair<Object, Object> pair = Pair.make(request, callback);
+        if (requestSet.contains(pair))
             return;
 
-        Pair<Object, Object> pair = new Pair<Object, Object>(request, callback);
         requestQueue.add(pair);
         requestSet.add(pair);
 
@@ -572,7 +575,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
         Throwable t = throwable.get();
 
         if(t != null) {
-               Logger.defaultLogError(t);
+            LOGGER.error("syncRequest(AsyncMultiRead, AsyncMultiProcedure) failed", t);
             throw new RuntimeException(t.getMessage());
         }
         
@@ -625,7 +628,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
         Throwable t = throwable.get();
 
         if(t != null) {
-               Logger.defaultLogError(t);
+            LOGGER.error("syncRequest(AsyncRead, AsyncProcedure) failed", t);
             throw new RuntimeException(t.getMessage());
         }
         
@@ -691,7 +694,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
             @Override
             public void exception(Throwable t) {
-                       Logger.defaultLogError(t);
+                LOGGER.error("asyncRequest(AsyncRead) failed", t);
             }
 
         });
@@ -736,7 +739,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
         Throwable t = throwable.get();
 
         if(t != null) {
-               Logger.defaultLogError(t);
+            LOGGER.error("syncRequest(AsyncRead) failed", t);
             throw new RuntimeException(t.getMessage());
         }
 
@@ -775,7 +778,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
         Throwable t = throwable.get();
 
         if(t != null) {
-               Logger.defaultLogError(t);
+            LOGGER.error("syncRequest(AsyncMultiRead) failed", t);
             throw new RuntimeException(t.getMessage());
         }
 
@@ -878,7 +881,7 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
             @Override
             public void exception(Throwable t) {
-                Logger.defaultLogError(t);
+                LOGGER.error("asyncRequest(Read) failed", t);
             }
 
         });
@@ -889,10 +892,10 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
     public synchronized <T> void asyncRequest(Read<T> request,
             AsyncProcedure<T> procedure) {
 
-        if (requestSet.contains(request))
+        Pair<Object, Object> pair = Pair.make(request, procedure);
+        if (requestSet.contains(pair))
             return;
 
-        Pair<Object, Object> pair = new Pair<Object, Object>(request, procedure);
         requestQueue.add(pair);
         requestSet.add(pair);
 
@@ -917,17 +920,17 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
         final ArrayList<T> result = new ArrayList<T>();
         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
 
-        syncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+        syncRequest(request, new SyncMultiProcedureAdapter<T>() {
 
             @Override
-            public void execute(AsyncReadGraph graph, T t) {
+            public void execute(ReadGraph graph, T t) {
                 synchronized(result) {
                     result.add(t);
                 }
             }
 
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 exception.set(t);
             }
 
@@ -947,11 +950,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
         return result;
     }
 
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure)  {
-        throw new UnsupportedOperationException("Not implemented");
-    }
-
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
         throw new UnsupportedOperationException("Not implemented");
@@ -967,11 +965,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
         throw new UnsupportedOperationException("Not implemented");
     }
 
-    @Override
-    public <T> void asyncRequest(MultiRead<T> request, AsyncMultiProcedure<T> procedure) {
-        throw new UnsupportedOperationException("Not implemented");
-    }
-
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
         throw new UnsupportedOperationException("Not implemented");
@@ -1103,12 +1096,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
     }
 
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
-        throw new UnsupportedOperationException("Not implemented.");
-
-    }
-
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
         throw new UnsupportedOperationException("Not implemented.");
@@ -1153,12 +1140,6 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
 
     }
 
-    @Override
-    public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiListener<T> arg1) {
-        throw new UnsupportedOperationException("Not implemented.");
-
-    }
-
     @Override
     public <T> void asyncRequest(MultiRead<T> arg0, SyncMultiListener<T> arg1) {
         throw new UnsupportedOperationException("Not implemented.");
@@ -1327,4 +1308,9 @@ public class MergingGraphRequestProcessor implements RequestProcessor {
                throw new UnsupportedOperationException();
        }
        
+       @Override
+       public <T> T l0() {
+               return processor.l0();
+       }
+       
 }