]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Removed contact application support prints
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
index 80dc3b9bbbbea42e31c1ee3fdc09f613e4753ec1..30e0dde5244de43dbd11ff9582015dad0d831f76 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -27,16 +27,19 @@ import java.util.TreeMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 
 import org.eclipse.core.runtime.Platform;
 import org.simantics.databoard.Bindings;
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.ChangeSet;
 import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.Disposable;
 import org.simantics.db.ExternalValueSupport;
 import org.simantics.db.Metadata;
 import org.simantics.db.MonitorContext;
 import org.simantics.db.MonitorHandler;
+import org.simantics.db.ReadGraph;
 import org.simantics.db.Resource;
 import org.simantics.db.ResourceSerializer;
 import org.simantics.db.Session;
@@ -50,10 +53,13 @@ import org.simantics.db.common.Indexing;
 import org.simantics.db.common.TransactionPolicyRelease;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
@@ -73,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.exception.ServiceException;
 import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.ClusterBase;
 import org.simantics.db.impl.ClusterI;
 import org.simantics.db.impl.ClusterTraitsBase;
@@ -86,7 +93,9 @@ import org.simantics.db.impl.graph.WriteGraphImpl;
 import org.simantics.db.impl.graph.WriteSupport;
 import org.simantics.db.impl.internal.RandomAccessValueSupport;
 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
 import org.simantics.db.impl.query.QueryProcessor;
 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
@@ -155,7 +164,6 @@ import org.simantics.db.service.XSupport;
 import org.simantics.layer0.Layer0;
 import org.simantics.utils.DataContainer;
 import org.simantics.utils.Development;
-import org.simantics.utils.datastructures.Callback;
 import org.simantics.utils.threads.logger.ITask;
 import org.simantics.utils.threads.logger.ThreadLogger;
 
@@ -324,31 +332,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         private Object result;
         final Semaphore sema = new Semaphore(0);
         private Throwable throwable = null;
-        final Callback<DatabaseException> callback = new Callback<DatabaseException>() {
-            @Override
-            public void run(DatabaseException e) {
-                synchronized (TaskHelper.this) {
-                    throwable = e;
-                }
+        final Consumer<DatabaseException> callback = e -> {
+            synchronized (TaskHelper.this) {
+                throwable = e;
             }
         };
         final Procedure<Object> proc = new Procedure<Object>() {
             @Override
             public void execute(Object result) {
-                callback.run(null);
+                callback.accept(null);
             }
             @Override
             public void exception(Throwable t) {
                 if (t instanceof DatabaseException)
-                    callback.run((DatabaseException)t);
+                    callback.accept((DatabaseException)t);
                 else
-                    callback.run(new DatabaseException("" + name + "operation failed.", t));
+                    callback.accept(new DatabaseException("" + name + "operation failed.", t));
             }
         };
         final WriteTraits writeTraits = new WriteTraits() {};
         TaskHelper(String name) {
             this.name = name;
         }
+        @SuppressWarnings("unchecked")
         <T> T getResult() {
             return (T)result;
         }
@@ -387,7 +393,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             return null;
     }
 
-    public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify) {
+    public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
         scheduleRequest(request, callback, notify, null);
     }
 
@@ -395,7 +401,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
      */
     @Override
-    public <T> void scheduleRequest(final Write request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+    public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
 
         assert (request != null);
 
@@ -410,7 +416,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -431,6 +437,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 try {
 
                     flushCounter = 0;
+                    Disposable.safeDispose(clientChanges);
                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                     VirtualGraph vg = getProvider(request.getProvider());
@@ -440,12 +447,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                         @Override
                         public void execute(Object result) {
-                            if(callback != null) callback.run(null);
+                            if(callback != null) callback.accept(null);
                         }
 
                         @Override
                         public void exception(Throwable t) {
-                            if(callback != null) callback.run((DatabaseException)t);
+                            if(callback != null) callback.accept((DatabaseException)t);
                         }
 
                     });
@@ -466,7 +473,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     }
 
 
-                    assert(!queryProvider2.dirty);
+                    assert(!queryProvider2.cache.dirty);
 
                 } catch (Throwable e) {
 
@@ -499,7 +506,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 //                        state.cancelCommit(context, clusterStream);
 //                        if (!empty) {
 //                            if (!context.isOk()) // this is a blocking operation
-//                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
+//                                throw new InternalException("Cancel failed. This should never happen.");
 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
 //                        }
 //                        state.cancelCommit2(context, clusterStream);
@@ -547,7 +554,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -557,6 +564,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
+                Disposable.safeDispose(clientChanges);
                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                 VirtualGraph vg = getProvider(request.getProvider());
@@ -616,7 +624,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
     }
 
-    public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify) {
+    public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
         scheduleRequest(request, callback, notify, null);
     }
 
@@ -624,7 +632,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
      */
     @Override
-    public <T> void scheduleRequest(final DelayedWrite request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+    public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
 
         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
 
@@ -632,7 +640,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -642,13 +650,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     @Override
                     public void execute(Object result) {
                         if (callback != null)
-                            callback.run(null);
+                            callback.accept(null);
                     }
                     @Override
                     public void exception(Throwable t) {
                         if (callback != null) {
-                            if (t instanceof DatabaseException) callback.run((DatabaseException) t);
-                            else callback.run(new DatabaseException(t));
+                            if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
+                            else callback.accept(new DatabaseException(t));
                         } else
                             Logger.defaultLogError("Unhandled exception", t);
                     }
@@ -665,6 +673,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 } catch (Throwable e) {
                     delayedWriteState.except(e);
                     total.finish();
+                    dwg.close();
                     return;
                 } finally {
 //                    newGraph.state.barrier.dec();
@@ -678,6 +687,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
+                Disposable.safeDispose(clientChanges);
                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                 acquireWriteOnly();
@@ -848,14 +858,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         private void maintainCluster(ClusterImpl before, ClusterI after_) {
             if(after_ != null && after_ != before) {
                 ClusterImpl after = (ClusterImpl)after_;
-                if(currentCluster == before) currentCluster = after;
+                if(currentCluster == before) {
+                    currentCluster = after;
+                }
                 clusterTable.replaceCluster(after);
             }
         }
 
         public int createResourceKey(int foreignCounter) throws DatabaseException {
-            if(currentCluster == null)
+            if(currentCluster == null) {
                 currentCluster = getNewResourceCluster();
+            }
             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
                 newCluster.foreignLookup = new byte[foreignCounter];
@@ -1016,15 +1029,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                        try {
 
                                int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
-                               int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
-                               int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
 
                                ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
                                ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
 
                                clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
-                               clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
-                               clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
+                               clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
+                               clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
                                clusterTranslator.removeStatement(cluster);
 
                                queryProvider2.invalidateResource(s);
@@ -1289,11 +1300,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         try {
 
-            int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+            //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
             flushCounter = 0;
+            Disposable.safeDispose(clientChanges);
             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
             acquireWriteOnly();
@@ -1354,12 +1366,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
     }
 
-    public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify) {
+    public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
         scheduleRequest(request, callback, notify, null);
     }
 
     @Override
-    public <T> void scheduleRequest(final WriteOnly request, final Callback<DatabaseException> callback, final Semaphore notify, Boolean combine) {
+    public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
 
         assertAlive();
 
@@ -1367,7 +1379,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -1379,6 +1391,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                         flushCounter = 0;
+                        Disposable.safeDispose(clientChanges);
                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                         acquireWriteOnly();
@@ -1392,12 +1405,12 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                             @Override
                             public void execute(Object result) {
-                                if(callback != null) callback.run(null);
+                                if(callback != null) callback.accept(null);
                             }
 
                             @Override
                             public void exception(Throwable t) {
-                                if(callback != null) callback.run((DatabaseException)t);
+                                if(callback != null) callback.accept((DatabaseException)t);
                             }
 
                         });
@@ -1420,7 +1433,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                             if(!(e instanceof CancelTransactionException)) {
                                 if (callback != null)
-                                    callback.run(new DatabaseException(e));
+                                    callback.accept(new DatabaseException(e));
                             }
 
                             writeState.except(e);
@@ -1466,7 +1479,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(true) {
 
             @Override
             public void run(int thread) {
@@ -1488,9 +1501,9 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assert (request != null);
         assert (procedure != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+        //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
             public void run(int thread) {
@@ -1506,7 +1519,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     if (listener != null) {
 
                        try {
-                               newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
+                               
+                               AsyncProcedure ap = new AsyncProcedure<T>() {
 
                                        @Override
                                        public void exception(AsyncReadGraph graph, Throwable t) {
@@ -1524,7 +1538,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                                                procedure.execute(graph, t);
                                        }
 
-                               }, listener);
+                               };
+                               
+                               QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
+                               
                        } catch (Throwable t) {
                            // This is handled by the AsyncProcedure
                                //Logger.defaultLogError("Internal error", t);
@@ -1604,13 +1621,77 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
+
+            @Override
+            public void run(int thread) {
+
+                fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+                try {
+
+                    if (listener != null) {
+
+                        try {
+                               QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+                               //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
+                                                       //newGraph.processor.query(newGraph, request, null, procedure, listener);
+                                               } catch (DatabaseException e) {
+                                                       Logger.defaultLogError(e);
+                                               }
+
+                    } else {
+
+//                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
+//                                procedure, "request");
+
+                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, procedure, request);
+
+                       try {
+
+                            request.perform(newGraph, wrap);
+                            wrap.get();
+
+                        } catch (DatabaseException e) {
+
+                                                       Logger.defaultLogError(e);
+
+                        }
+
+                    }
+
+                } finally {
+
+                    fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                }
+
+            }
+
+        });
+
+    }
+
+    public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+        assert (request != null);
+        assert (procedure != null);
+
+        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+        int sync = notify != null ? thread : -1;
+
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
             public void run(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ListenerBase listener = getListenerBase(procedure);
+
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
 
                 try {
@@ -1623,22 +1704,15 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     } else {
 
-                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-                                procedure, "request");
+                        final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             request.perform(newGraph, wrapper);
 
-//                            newGraph.waitAsync(request);
-
                         } catch (Throwable t) {
 
-                            wrapper.exception(newGraph, t);
-//                            newGraph.waitAsync(request);
-
+                            t.printStackTrace();
 
                         }
 
@@ -1655,7 +1729,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         });
 
     }
-
+    
     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
 
         assert (request != null);
@@ -1665,7 +1739,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int sync = notify != null ? thread : -1;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
             public void run(int thread) {
@@ -1719,7 +1793,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
             public void run(int thread) {
@@ -1734,25 +1808,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     if (listener != null) {
 
-                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
-
-                            @Override
-                            public void exception(Throwable t) {
-                                procedure.exception(t);
-                                if(throwable != null) {
-                                    throwable.set(t);
-                                }
-                            }
-
-                            @Override
-                            public void execute(T t) {
-                                if(result != null) result.set(t);
-                                procedure.execute(t);
-                            }
-
-                        }, listener);
-
-//                        newGraph.waitAsync(request);
+                        try {
+                            QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+                        } catch (DatabaseException e) {
+                            Logger.defaultLogError(e);
+                        }
 
                     } else {
 
@@ -1959,14 +2019,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assertAlive();
         Semaphore notify = new Semaphore(0);
         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
-        scheduleRequest(request, new Callback<DatabaseException>() {
-
-            @Override
-            public void run(DatabaseException e) {
-              exception.set(e);
-            }
-
-        }, notify);
+        scheduleRequest(request, e -> exception.set(e), notify);
         acquire(notify, request);
         if(exception.get() != null) throw exception.get();
     }
@@ -2060,12 +2113,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assertNotSession();
         Semaphore notify = new Semaphore(0);
         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
-        scheduleRequest(request, new Callback<DatabaseException>() {
-            @Override
-            public void run(DatabaseException e) {
-                exception.set(e);
-            }
-        }, notify);
+        scheduleRequest(request, e -> exception.set(e), notify);
         acquire(notify, request);
         if(exception.get() != null) throw exception.get();
     }
@@ -2076,12 +2124,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assertAlive();
         Semaphore notify = new Semaphore(0);
         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
-        scheduleRequest(request, new Callback<DatabaseException>() {
-            @Override
-            public void run(DatabaseException e) {
-                exception.set(e);
-            }
-        }, notify);
+        scheduleRequest(request, e -> exception.set(e), notify);
         acquire(notify, request);
         if(exception.get() != null) throw exception.get();
     }
@@ -2113,7 +2156,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     }
 
     @Override
-    public void asyncRequest(final Write request, final Callback<DatabaseException> callback) {
+    public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
 
         scheduleRequest(request, callback, null);
 
@@ -2141,7 +2184,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     }
 
     @Override
-    public void asyncRequest(final DelayedWrite request, final Callback<DatabaseException> callback) {
+    public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
 
         scheduleRequest(request, callback, null);
 
@@ -2158,7 +2201,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     }
 
     @Override
-    public void asyncRequest(final WriteOnly request, final Callback<DatabaseException> callback) {
+    public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
 
         scheduleRequest(request, callback, null);
 
@@ -2282,16 +2325,16 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         return true;
     }
 
-    static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Callback<ReadGraphImpl> runnable) {
+    static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
 
-        Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
+        Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
 
             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
 
             @Override
-            public void run(ReadGraphImpl graph) {
+            public void accept(ReadGraphImpl graph) {
                 if(ready.decrementAndGet() == 0) {
-                    runnable.run(graph);
+                    runnable.accept(graph);
                 }
             }
 
@@ -2305,24 +2348,24 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     e.printStackTrace();
                 }
             } else {
-                composite.run(graph);
+                composite.accept(graph);
             }
         }
 
-        composite.run(graph);
+        composite.accept(graph);
 
     }
 
-    static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Callback<ReadGraphImpl> runnable) {
+    static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
 
-        Callback<ReadGraphImpl> composite = new Callback<ReadGraphImpl>() {
+        Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
 
             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
 
             @Override
-            public void run(ReadGraphImpl graph) {
+            public void accept(ReadGraphImpl graph) {
                 if(ready.decrementAndGet() == 0) {
-                    runnable.run(graph);
+                    runnable.accept(graph);
                 }
             }
 
@@ -2336,11 +2379,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     e.printStackTrace();
                 }
             } else {
-                composite.run(graph);
+                composite.accept(graph);
             }
         }
 
-        composite.run(graph);
+        composite.accept(graph);
 
     }
 
@@ -3059,21 +3102,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         final ArrayList<T> result = new ArrayList<T>();
         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
 
-        syncRequest(request, new AsyncMultiProcedure<T>() {
+        syncRequest(request, new SyncMultiProcedure<T>() {
 
             @Override
-            public void execute(AsyncReadGraph graph, T t) {
+            public void execute(ReadGraph graph, T t) {
                 synchronized(result) {
                     result.add(t);
                 }
             }
 
             @Override
-            public void finished(AsyncReadGraph graph) {
+            public void finished(ReadGraph graph) {
             }
 
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 exception.set(t);
             }
 
@@ -3091,33 +3134,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     }
 
     @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
+    public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
+        throw new Error("Not implemented!");
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        return syncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
-    }
-
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
-        assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3273,38 +3310,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert(request != null);
 
-        asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+        asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 t.printStackTrace();
             }
         });
 
     }
 
-    @Override
-    public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
-        asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
-    }
-
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+       scheduleRequest(request, procedure, null);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3346,17 +3378,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
     }
 
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
-        assertNotSession();
-        throw new Error("Not implemented!");
-    }
-
-    @Override
-    public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
-        throw new Error("Not implemented!");
-    }
-
     @Override
     final public <T> void asyncRequest(final ExternalRead<T> request) {