]> gerrit.simantics Code Review - simantics/platform.git/blobdiff - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Multiple readers in db client
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
index 9a86dc1fa2ae813b0b3a137b51a9063546af27af..030d512eb736e2f6303f0d68c1d0b0e3d2e57b97 100644 (file)
@@ -1,5 +1,5 @@
 /*******************************************************************************
- * Copyright (c) 2007, 2010 Association for Decentralized Information Management
+ * Copyright (c) 2007, 2018 Association for Decentralized Information Management
  * in Industry THTH ry.
  * All rights reserved. This program and the accompanying materials
  * are made available under the terms of the Eclipse Public License v1.0
@@ -34,10 +34,12 @@ import org.simantics.databoard.Bindings;
 import org.simantics.db.AsyncReadGraph;
 import org.simantics.db.ChangeSet;
 import org.simantics.db.DevelopmentKeys;
+import org.simantics.db.Disposable;
 import org.simantics.db.ExternalValueSupport;
 import org.simantics.db.Metadata;
 import org.simantics.db.MonitorContext;
 import org.simantics.db.MonitorHandler;
+import org.simantics.db.ReadGraph;
 import org.simantics.db.Resource;
 import org.simantics.db.ResourceSerializer;
 import org.simantics.db.Session;
@@ -51,10 +53,13 @@ import org.simantics.db.common.Indexing;
 import org.simantics.db.common.TransactionPolicyRelease;
 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
+import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
+import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
@@ -74,6 +79,7 @@ import org.simantics.db.exception.ResourceNotFoundException;
 import org.simantics.db.exception.RuntimeDatabaseException;
 import org.simantics.db.exception.ServiceException;
 import org.simantics.db.exception.ServiceNotFoundException;
+import org.simantics.db.impl.BlockingAsyncProcedure;
 import org.simantics.db.impl.ClusterBase;
 import org.simantics.db.impl.ClusterI;
 import org.simantics.db.impl.ClusterTraitsBase;
@@ -87,7 +93,9 @@ import org.simantics.db.impl.graph.WriteGraphImpl;
 import org.simantics.db.impl.graph.WriteSupport;
 import org.simantics.db.impl.internal.RandomAccessValueSupport;
 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
-import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
+import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
+import org.simantics.db.impl.query.QueryCache;
+import org.simantics.db.impl.query.QueryCacheBase;
 import org.simantics.db.impl.query.QueryProcessor;
 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
@@ -135,6 +143,7 @@ import org.simantics.db.service.ClusteringSupport;
 import org.simantics.db.service.CollectionSupport;
 import org.simantics.db.service.DebugSupport;
 import org.simantics.db.service.DirectQuerySupport;
+import org.simantics.db.service.EventSupport;
 import org.simantics.db.service.GraphChangeListenerSupport;
 import org.simantics.db.service.InitSupport;
 import org.simantics.db.service.LifecycleSupport;
@@ -158,6 +167,7 @@ import org.simantics.utils.DataContainer;
 import org.simantics.utils.Development;
 import org.simantics.utils.threads.logger.ITask;
 import org.simantics.utils.threads.logger.ThreadLogger;
+import org.slf4j.LoggerFactory;
 
 import gnu.trove.procedure.TLongProcedure;
 import gnu.trove.set.hash.TLongHashSet;
@@ -165,6 +175,8 @@ import gnu.trove.set.hash.TLongHashSet;
 
 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
 
+    private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
+
     protected static final boolean DEBUG        = false;
 
     private static final boolean DIAGNOSTICS       = false;
@@ -281,6 +293,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
+        serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
         ServiceActivityUpdaterForWriteTransactions.register(this);
 
         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
@@ -406,12 +419,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             }
         }
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 if(Development.DEVELOPMENT) {
                     try {
@@ -429,6 +440,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                 try {
 
                     flushCounter = 0;
+                    Disposable.safeDispose(clientChanges);
                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                     VirtualGraph vg = getProvider(request.getProvider());
@@ -464,7 +476,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     }
 
 
-                    assert(!queryProvider2.dirty);
+                    assert(!queryProvider2.cache.dirty);
 
                 } catch (Throwable e) {
 
@@ -497,7 +509,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 //                        state.cancelCommit(context, clusterStream);
 //                        if (!empty) {
 //                            if (!context.isOk()) // this is a blocking operation
-//                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
+//                                throw new InternalException("Cancel failed. This should never happen.");
 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
 //                        }
 //                        state.cancelCommit2(context, clusterStream);
@@ -543,18 +555,17 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
 
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
+                Disposable.safeDispose(clientChanges);
                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                 VirtualGraph vg = getProvider(request.getProvider());
@@ -624,16 +635,14 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     @Override
     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
 
-        final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
+        final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
                 Procedure<Object> stateProcedure = new Procedure<Object>() {
@@ -673,10 +682,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                 delayedWriteState = null;
 
-                ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
+                ITask task2 = ThreadLogger.task("DelayedWriteCommit");
                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                 flushCounter = 0;
+                Disposable.safeDispose(clientChanges);
                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                 acquireWriteOnly();
@@ -1018,15 +1028,13 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                        try {
 
                                int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
-                               int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
-                               int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
 
                                ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
                                ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
 
                                clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
-                               clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
-                               clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
+                               clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
+                               clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
                                clusterTranslator.removeStatement(cluster);
 
                                queryProvider2.invalidateResource(s);
@@ -1296,6 +1304,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
             flushCounter = 0;
+            Disposable.safeDispose(clientChanges);
             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
             acquireWriteOnly();
@@ -1367,12 +1376,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
 
@@ -1381,6 +1388,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
 
                         flushCounter = 0;
+                        Disposable.safeDispose(clientChanges);
                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
 
                         acquireWriteOnly();
@@ -1466,12 +1474,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert (request != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleWrite(new SessionTask(request, thread) {
+        requestManager.scheduleWrite(new SessionTask(null) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
 
@@ -1490,25 +1496,29 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         assert (request != null);
         assert (procedure != null);
 
-        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
-
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
                 ListenerBase listener = getListenerBase(procedure);
 
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
 
+                // This is never synced but increase to prevent it from visiting 0
+                newGraph.asyncBarrier.inc();
+
                 try {
 
                     if (listener != null) {
 
                        try {
-                               newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
+                               
+                               AsyncProcedure ap = new AsyncProcedure<T>() {
 
                                        @Override
                                        public void exception(AsyncReadGraph graph, Throwable t) {
@@ -1526,7 +1536,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                                                procedure.execute(graph, t);
                                        }
 
-                               }, listener);
+                               };
+                               
+                               QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
+                               
                        } catch (Throwable t) {
                            // This is handled by the AsyncProcedure
                                //Logger.defaultLogError("Internal error", t);
@@ -1536,8 +1549,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             T t = request.perform(newGraph);
 
                             try {
@@ -1581,9 +1592,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                             }
 
                         }
-
-//                        newGraph.state.barrier.dec();
-//                        newGraph.waitAsync(request);
+                        
+                        task.finish();
 
                     }
 
@@ -1606,13 +1616,87 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
+                ITask task = ThreadLogger.task(request);
+
+                final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
+
+                try {
+
+                    if (listener != null) {
+
+                        try {
+                               QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
+                                               } catch (DatabaseException e) {
+                                                       Logger.defaultLogError(e);
+                                               }
+
+                    } else {
+
+                       BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
+                               
+                               public void execute(AsyncReadGraph graph_, T result) {
+                                       task.finish();
+                                       super.execute(graph_, result);
+                               }
+                               
+                               public void exception(AsyncReadGraph graph_, Throwable t) {
+                                       task.finish();
+                                       super.exception(graph_, t);
+                               }
+                               
+                       };
+
+                       try {
+
+                            request.perform(newGraph, wrap);
+                            wrap.dec();
+                            wrap.get();
+
+                        } catch (DatabaseException e) {
+
+                                                       Logger.defaultLogError(e);
+
+                        }
+
+                    }
+
+                } finally {
+
+                    fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                }
+
+            }
+
+        });
+
+    }
+
+    public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
+
+        assert (request != null);
+        assert (procedure != null);
+
+        int thread = request.hashCode() & queryProvider2.THREAD_MASK;
+
+        int sync = notify != null ? thread : -1;
+
+        requestManager.scheduleRead(new SessionRead(null, notify) {
+
+            @Override
+            public void run0(int thread) {
+
+                fireSessionVariableChange(SessionVariables.QUEUED_READS);
+
+                ListenerBase listener = getListenerBase(procedure);
+
                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
 
                 try {
@@ -1625,22 +1709,15 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     } else {
 
-                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
-                                procedure, "request");
+                        final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
 
                         try {
 
-//                            newGraph.state.barrier.inc();
-
                             request.perform(newGraph, wrapper);
 
-//                            newGraph.waitAsync(request);
-
                         } catch (Throwable t) {
 
-                            wrapper.exception(newGraph, t);
-//                            newGraph.waitAsync(request);
-
+                            t.printStackTrace();
 
                         }
 
@@ -1657,7 +1734,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         });
 
     }
-
+    
     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
 
         assert (request != null);
@@ -1667,10 +1744,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int sync = notify != null ? thread : -1;
 
-        requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
+        requestManager.scheduleRead(new SessionRead(null, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
@@ -1721,10 +1798,10 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
 
-        requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
+        requestManager.scheduleRead(new SessionRead(throwable, notify) {
 
             @Override
-            public void run(int thread) {
+            public void run0(int thread) {
 
                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
 
@@ -1736,25 +1813,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
                     if (listener != null) {
 
-                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
-
-                            @Override
-                            public void exception(Throwable t) {
-                                procedure.exception(t);
-                                if(throwable != null) {
-                                    throwable.set(t);
-                                }
-                            }
-
-                            @Override
-                            public void execute(T t) {
-                                if(result != null) result.set(t);
-                                procedure.execute(t);
-                            }
-
-                        }, listener);
-
-//                        newGraph.waitAsync(request);
+                        try {
+                            QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
+                        } catch (DatabaseException e) {
+                            Logger.defaultLogError(e);
+                        }
 
                     } else {
 
@@ -2430,7 +2493,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
                     try {
                         l.graphChanged(e2);
                     } catch (Throwable ex) {
-                        ex.printStackTrace();
+                        LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
                     }
                 }
 
@@ -2439,7 +2502,7 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
             }
 
         } catch (Throwable t) {
-            t.printStackTrace();
+            LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
         }
 
     }
@@ -3044,21 +3107,21 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         final ArrayList<T> result = new ArrayList<T>();
         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
 
-        syncRequest(request, new AsyncMultiProcedure<T>() {
+        syncRequest(request, new SyncMultiProcedure<T>() {
 
             @Override
-            public void execute(AsyncReadGraph graph, T t) {
+            public void execute(ReadGraph graph, T t) {
                 synchronized(result) {
                     result.add(t);
                 }
             }
 
             @Override
-            public void finished(AsyncReadGraph graph) {
+            public void finished(ReadGraph graph) {
             }
 
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 exception.set(t);
             }
 
@@ -3076,33 +3139,27 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
     }
 
     @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
+    public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
+        throw new Error("Not implemented!");
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        return syncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
-    }
-
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
-        assertNotSession();
-        return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
         assertNotSession();
-        return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3137,9 +3194,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         });
 
         Throwable t = exception.get();
-        if(t != null) {
-            if(t instanceof DatabaseException) throw (DatabaseException)t;
-            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
+        if (t != null) {
+            if (t instanceof DatabaseException)
+                throw (DatabaseException) t;
+            else
+                throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
         }
 
         return result;
@@ -3258,38 +3317,33 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         assert(request != null);
 
-        asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
+        asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
             @Override
-            public void exception(AsyncReadGraph graph, Throwable t) {
+            public void exception(ReadGraph graph, Throwable t) {
                 t.printStackTrace();
             }
         });
 
     }
 
-    @Override
-    public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
-        asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
-    }
-
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, (SyncMultiProcedure<T>)procedure);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
-        asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
+       scheduleRequest(request, procedure, null);
     }
 
     @Override
     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
-        asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
+        asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
     }
 
     @Override
@@ -3331,17 +3385,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
     }
 
-    @Override
-    public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
-        assertNotSession();
-        throw new Error("Not implemented!");
-    }
-
-    @Override
-    public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
-        throw new Error("Not implemented!");
-    }
-
     @Override
     final public <T> void asyncRequest(final ExternalRead<T> request) {
 
@@ -3361,28 +3404,6 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
         asyncRequest(request, (Procedure<T>)procedure);
     }
 
-
-
-    void check(Throwable t) throws DatabaseException {
-        if(t != null) {
-            if(t instanceof DatabaseException) throw (DatabaseException)t;
-            else throw new DatabaseException("Unexpected exception", t);
-        }
-    }
-
-    void check(DataContainer<Throwable> container) throws DatabaseException {
-        Throwable t = container.get();
-        if(t != null) {
-            if(t instanceof DatabaseException) throw (DatabaseException)t;
-            else throw new DatabaseException("Unexpected exception", t);
-        }
-    }
-
-
-
-
-
-
     boolean sameProvider(Write request) {
         if(writeState.getGraph().provider != null) {
             return writeState.getGraph().provider.equals(request.getProvider());
@@ -3399,9 +3420,11 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
 
     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
+
     private void assertNotSession() throws DatabaseException {
         Thread current = Thread.currentThread();
-        if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
+        if (sessionThreads.contains(current))
+            throw new ServiceException("Caller is already inside a transaction.");
     }
 
     void assertAlive() {
@@ -3526,8 +3549,8 @@ public abstract class SessionImplSocket implements Session, WriteRequestSchedule
 
         while(dirtyPrimitives) {
             dirtyPrimitives = false;
-            getQueryProvider2().performDirtyUpdates(writer);
-            getQueryProvider2().performScheduledUpdates(writer);
+            getQueryProvider2().propagateChangesInQueryCache(writer);
+            getQueryProvider2().listening.fireListeners(writer);
         }
 
         fireMetadataListeners(writer, clientChanges);