]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Trying to wait for procedures
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
172
173
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
175
176     protected static final boolean DEBUG        = false;
177
178     private static final boolean DIAGNOSTICS       = false;
179
180     private TransactionPolicySupport transactionPolicy;
181
182     final private ClusterControl clusterControl;
183
184     final protected BuiltinSupportImpl builtinSupport;
185     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186     final protected ClusterSetsSupport clusterSetsSupport;
187     final protected LifecycleSupportImpl lifecycleSupport;
188
189     protected QuerySupportImpl querySupport;
190     protected ResourceSupportImpl resourceSupport;
191     protected WriteSupport writeSupport;
192     public ClusterTranslator clusterTranslator;
193
194     boolean dirtyPrimitives = false;
195
196     public static final int SERVICE_MODE_CREATE = 2;
197     public static final int SERVICE_MODE_ALLOW = 1;
198     public int serviceMode = 0;
199     public boolean createdImmutableClusters = false;
200     public TLongHashSet createdClusters = new TLongHashSet();
201
202     private Layer0 L0;
203
204     /**
205      * The service locator maintained by the workbench. These services are
206      * initialized during workbench during the <code>init</code> method.
207      */
208     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
209
210     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
211
212     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
213
214     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
215
216     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
217
218     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
219
220     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
221
222     final protected State                                        state              = new State();
223
224     protected GraphSession                                       graphSession       = null;
225
226     protected SessionManager                                     sessionManagerImpl = null;
227
228     protected UserAuthenticationAgent                            authAgent          = null;
229
230     protected UserAuthenticator                                  authenticator      = null;
231
232     protected Resource                                           user               = null;
233
234     protected ClusterStream                                      clusterStream      = null;
235
236     protected SessionRequestManager                         requestManager = null;
237
238     public ClusterTable                                       clusterTable       = null;
239
240     public QueryProcessor                                     queryProvider2 = null;
241
242     ClientChangesImpl                                  clientChanges      = null;
243
244     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
245
246 //    protected long                                               newClusterId       = Constants.NullClusterId;
247
248     protected int                                                flushCounter       = 0;
249
250     protected boolean                                            writeOnly          = false;
251
252     WriteState<?>                                                writeState = null;
253     WriteStateBase<?>                                            delayedWriteState = null;
254     protected Resource defaultClusterSet = null; // If not null then used for newResource().
255
256     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
257
258 //        if (authAgent == null)
259 //            throw new IllegalArgumentException("null authentication agent");
260
261         File t = StaticSessionProperties.virtualGraphStoragePath;
262         if (null == t)
263             t = new File(".");
264         this.clusterTable = new ClusterTable(this, t);
265         this.builtinSupport = new BuiltinSupportImpl(this);
266         this.sessionManagerImpl = sessionManagerImpl;
267         this.user = null;
268 //        this.authAgent = authAgent;
269
270         serviceLocator.registerService(Session.class, this);
271         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292         ServiceActivityUpdaterForWriteTransactions.register(this);
293
294         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297         this.lifecycleSupport = new LifecycleSupportImpl(this);
298         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299         this.transactionPolicy = new TransactionPolicyRelease();
300         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301         this.clusterControl = new ClusterControlImpl(this);
302         serviceLocator.registerService(ClusterControl.class, clusterControl);
303         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304         this.clusterSetsSupport.setReadDirectory(t.toPath());
305         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
307     }
308
309     /*
310      *
311      * Session interface
312      */
313
314
315     @Override
316     public Resource getRootLibrary() {
317         return queryProvider2.getRootLibraryResource();
318     }
319
320     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321         if (!graphSession.dbSession.refreshEnabled())
322             return;
323         try {
324             getClusterTable().refresh(csid, this, clusterUID);
325         } catch (Throwable t) {
326             Logger.defaultLogError("Refesh failed.", t);
327         }
328     }
329
330     static final class TaskHelper {
331         private final String name;
332         private Object result;
333         final Semaphore sema = new Semaphore(0);
334         private Throwable throwable = null;
335         final Consumer<DatabaseException> callback = e -> {
336             synchronized (TaskHelper.this) {
337                 throwable = e;
338             }
339         };
340         final Procedure<Object> proc = new Procedure<Object>() {
341             @Override
342             public void execute(Object result) {
343                 callback.accept(null);
344             }
345             @Override
346             public void exception(Throwable t) {
347                 if (t instanceof DatabaseException)
348                     callback.accept((DatabaseException)t);
349                 else
350                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
351             }
352         };
353         final WriteTraits writeTraits = new WriteTraits() {};
354         TaskHelper(String name) {
355             this.name = name;
356         }
357         @SuppressWarnings("unchecked")
358         <T> T getResult() {
359             return (T)result;
360         }
361         void setResult(Object result) {
362             this.result = result;
363         }
364 //        Throwable throwableGet() {
365 //            return throwable;
366 //        }
367         synchronized void throwableSet(Throwable t) {
368             throwable = t;
369         }
370 //        void throwableSet(String t) {
371 //            throwable = new InternalException("" + name + " operation failed. " + t);
372 //        }
373         synchronized void throwableCheck()
374         throws DatabaseException {
375             if (null != throwable)
376                 if (throwable instanceof DatabaseException)
377                     throw (DatabaseException)throwable;
378                 else
379                     throw new DatabaseException("Undo operation failed.", throwable);
380         }
381         void throw_(String message)
382         throws DatabaseException {
383             throw new DatabaseException("" + name + " operation failed. " + message);
384         }
385     }
386
387
388
389     private ListenerBase getListenerBase(Object procedure) {
390         if (procedure instanceof ListenerBase)
391             return (ListenerBase) procedure;
392         else
393             return null;
394     }
395
396     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397         scheduleRequest(request, callback, notify, null);
398     }
399
400     /* (non-Javadoc)
401      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
402      */
403     @Override
404     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
405
406         assert (request != null);
407
408         if(Development.DEVELOPMENT) {
409             try {
410             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411                 System.err.println("schedule write '" + request + "'");
412             } catch (Throwable t) {
413                 Logger.defaultLogError(t);
414             }
415         }
416
417         requestManager.scheduleWrite(new SessionTask(null) {
418
419             @Override
420             public void run0(int thread) {
421
422                 if(Development.DEVELOPMENT) {
423                     try {
424                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
425                         System.err.println("perform write '" + request + "'");
426                     } catch (Throwable t) {
427                         Logger.defaultLogError(t);
428                     }
429                 }
430
431                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
432
433                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
434
435                 try {
436
437                     flushCounter = 0;
438                     Disposable.safeDispose(clientChanges);
439                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
440
441                     VirtualGraph vg = getProvider(request.getProvider());
442
443                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
444                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
445
446                         @Override
447                         public void execute(Object result) {
448                             if(callback != null) callback.accept(null);
449                         }
450
451                         @Override
452                         public void exception(Throwable t) {
453                             if(callback != null) callback.accept((DatabaseException)t);
454                         }
455
456                     });
457
458                     assert (null != writer);
459 //                    writer.state.barrier.inc();
460
461                     try {
462                         request.perform(writer);
463                         assert (null != writer);
464                     } catch (Throwable t) {
465                         if (!(t instanceof CancelTransactionException))
466                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
467                         writeState.except(t);
468                     } finally {
469 //                        writer.state.barrier.dec();
470 //                        writer.waitAsync(request);
471                     }
472
473
474                     assert(!queryProvider2.cache.dirty);
475
476                 } catch (Throwable e) {
477
478                     // Log it first, just to be safe that the error is always logged.
479                     if (!(e instanceof CancelTransactionException))
480                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
481
482 //                    writeState.getGraph().state.barrier.dec();
483 //                    writeState.getGraph().waitAsync(request);
484
485                     writeState.except(e);
486
487 //                    try {
488 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
489 //                        // All we can do here is to log those, can't really pass them anywhere.
490 //                        if (callback != null) {
491 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
492 //                            else callback.run(new DatabaseException(e));
493 //                        }
494 //                    } catch (Throwable e2) {
495 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
496 //                    }
497 //                    boolean empty = clusterStream.reallyFlush();
498 //                    int callerThread = -1;
499 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
500 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
501 //                    try {
502 //                        // Send all local changes to server so it can calculate correct reverse change set.
503 //                        // This will call clusterStream.accept().
504 //                        state.cancelCommit(context, clusterStream);
505 //                        if (!empty) {
506 //                            if (!context.isOk()) // this is a blocking operation
507 //                                throw new InternalException("Cancel failed. This should never happen.");
508 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
509 //                        }
510 //                        state.cancelCommit2(context, clusterStream);
511 //                    } catch (DatabaseException e3) {
512 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
513 //                    } finally {
514 //                        clusterStream.setOff(false);
515 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
516 //                    }
517
518                 } finally {
519
520                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
521
522                 }
523
524                 task.finish();
525
526                 if(Development.DEVELOPMENT) {
527                     try {
528                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
529                         System.err.println("finish write '" + request + "'");
530                     } catch (Throwable t) {
531                         Logger.defaultLogError(t);
532                     }
533                 }
534
535             }
536
537         }, combine);
538
539     }
540
541     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
542         scheduleRequest(request, procedure, notify, null);
543     }
544
545     /* (non-Javadoc)
546      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
547      */
548     @Override
549     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
550
551         assert (request != null);
552
553         requestManager.scheduleWrite(new SessionTask(null) {
554
555             @Override
556             public void run0(int thread) {
557
558                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
559
560                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
561
562                 flushCounter = 0;
563                 Disposable.safeDispose(clientChanges);
564                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
565
566                 VirtualGraph vg = getProvider(request.getProvider());
567                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
568
569                 try {
570                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
571                     writeState = writeStateT;
572
573                     assert (null != writer);
574 //                    writer.state.barrier.inc();
575                     writeStateT.setResult(request.perform(writer));
576                     assert (null != writer);
577
578 //                    writer.state.barrier.dec();
579 //                    writer.waitAsync(null);
580
581                 } catch (Throwable e) {
582
583 //                    writer.state.barrier.dec();
584 //                    writer.waitAsync(null);
585
586                     writeState.except(e);
587
588 //                  state.stopWriteTransaction(clusterStream);
589 //
590 //              } catch (Throwable e) {
591 //                  // Log it first, just to be safe that the error is always logged.
592 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
593 //
594 //                  try {
595 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
596 //                      // All we can do here is to log those, can't really pass them anywhere.
597 //                      if (procedure != null) {
598 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
599 //                          else procedure.exception(new DatabaseException(e));
600 //                      }
601 //                  } catch (Throwable e2) {
602 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
603 //                  }
604 //
605 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
606 //
607 //                  state.stopWriteTransaction(clusterStream);
608
609                 } finally {
610                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
611                 }
612
613 //              if(notify != null) notify.release();
614
615                 task.finish();
616
617             }
618
619         }, combine);
620
621     }
622
623     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
624         scheduleRequest(request, callback, notify, null);
625     }
626
627     /* (non-Javadoc)
628      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
629      */
630     @Override
631     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
632
633         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
634
635         assert (request != null);
636
637         requestManager.scheduleWrite(new SessionTask(null) {
638
639             @Override
640             public void run0(int thread) {
641                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
642
643                 Procedure<Object> stateProcedure = new Procedure<Object>() {
644                     @Override
645                     public void execute(Object result) {
646                         if (callback != null)
647                             callback.accept(null);
648                     }
649                     @Override
650                     public void exception(Throwable t) {
651                         if (callback != null) {
652                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
653                             else callback.accept(new DatabaseException(t));
654                         } else
655                             Logger.defaultLogError("Unhandled exception", t);
656                     }
657                 };
658
659                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
660                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
661                 DelayedWriteGraph dwg = null;
662 //                newGraph.state.barrier.inc();
663
664                 try {
665                     dwg = new DelayedWriteGraph(newGraph);
666                     request.perform(dwg);
667                 } catch (Throwable e) {
668                     delayedWriteState.except(e);
669                     total.finish();
670                     dwg.close();
671                     return;
672                 } finally {
673 //                    newGraph.state.barrier.dec();
674 //                    newGraph.waitAsync(request);
675                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
676                 }
677
678                 delayedWriteState = null;
679
680                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
681                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
682
683                 flushCounter = 0;
684                 Disposable.safeDispose(clientChanges);
685                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
686
687                 acquireWriteOnly();
688
689                 VirtualGraph vg = getProvider(request.getProvider());
690                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
691
692                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
693
694                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
695
696                 assert (null != writer);
697 //                writer.state.barrier.inc();
698
699                 try {
700                     // Cannot cancel
701                     dwg.commit(writer, request);
702
703                         if(defaultClusterSet != null) {
704                                 XSupport xs = getService(XSupport.class);
705                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
706                                 ClusteringSupport cs = getService(ClusteringSupport.class);
707                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
708                         }
709
710                     // This makes clusters available from server
711                     clusterStream.reallyFlush();
712
713                     releaseWriteOnly(writer);
714                     handleUpdatesAndMetadata(writer);
715
716                 } catch (ServiceException e) {
717 //                    writer.state.barrier.dec();
718 //                    writer.waitAsync(null);
719
720                     // These shall be requested from server
721                     clusterTable.removeWriteOnlyClusters();
722                     // This makes clusters available from server
723                     clusterStream.reallyFlush();
724
725                     releaseWriteOnly(writer);
726                     writeState.except(e);
727                 } finally {
728                     // Debugging & Profiling
729                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
730                     task2.finish();
731                     total.finish();
732                 }
733             }
734
735         }, combine);
736
737     }
738
739     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
740         scheduleRequest(request, procedure, notify, null);
741     }
742
743     /* (non-Javadoc)
744      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
745      */
746     @Override
747     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
748         throw new Error("Not implemented");
749     }
750
751     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
752         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
753         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
754             createdClusters.add(cluster.clusterId);
755         }
756         return cluster;
757     }
758
759     class WriteOnlySupport implements WriteSupport {
760
761         ClusterStream stream;
762         ClusterImpl currentCluster;
763
764         public WriteOnlySupport() {
765             this.stream = clusterStream;
766         }
767
768         @Override
769         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
770             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
771         }
772
773         @Override
774         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
775
776             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
777
778             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
779                 if(s != queryProvider2.getRootLibrary())
780                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
781
782             try {
783                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
784             } catch (DatabaseException e) {
785                 Logger.defaultLogError(e);
786                 //throw e;
787             }
788
789             clientChanges.invalidate(s);
790
791             if (cluster.isWriteOnly())
792                 return;
793             queryProvider2.updateStatements(s, p);
794
795         }
796
797         @Override
798         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
799             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
800         }
801
802         @Override
803         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
804
805             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
806             try {
807                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
808             } catch (DatabaseException e) {
809                 Logger.defaultLogError(e);
810             }
811
812             clientChanges.invalidate(rid);
813
814             if (cluster.isWriteOnly())
815                 return;
816             queryProvider2.updateValue(rid);
817
818         }
819
820         @Override
821         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
822
823             if(amount < 65536) {
824                 claimValue(provider,resource, reader.readBytes(null, amount));
825                 return;
826             }
827
828             byte[] bytes = new byte[65536];
829
830             int rid = ((ResourceImpl)resource).id;
831             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
832             try {
833                 int left = amount;
834                 while(left > 0) {
835                     int block = Math.min(left, 65536);
836                     reader.readBytes(bytes, block);
837                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
838                     left -= block;
839                 }
840             } catch (DatabaseException e) {
841                 Logger.defaultLogError(e);
842             }
843
844             clientChanges.invalidate(rid);
845
846             if (cluster.isWriteOnly())
847                 return;
848             queryProvider2.updateValue(rid);
849
850         }
851
852         private void maintainCluster(ClusterImpl before, ClusterI after_) {
853             if(after_ != null && after_ != before) {
854                 ClusterImpl after = (ClusterImpl)after_;
855                 if(currentCluster == before) {
856                     currentCluster = after;
857                 }
858                 clusterTable.replaceCluster(after);
859             }
860         }
861
862         public int createResourceKey(int foreignCounter) throws DatabaseException {
863             if(currentCluster == null) {
864                 currentCluster = getNewResourceCluster();
865             }
866             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
867                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
868                 newCluster.foreignLookup = new byte[foreignCounter];
869                 currentCluster = newCluster;
870                 if (DEBUG)
871                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
872             }
873             return currentCluster.createResource(clusterTranslator);
874         }
875
876         @Override
877         public Resource createResource(VirtualGraph provider) throws DatabaseException {
878             if(currentCluster == null) {
879                 if (null != defaultClusterSet) {
880                         ResourceImpl result = getNewResource(defaultClusterSet);
881                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
882                     return result;
883                 } else {
884                     currentCluster = getNewResourceCluster();
885                 }
886             }
887             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
888                 if (null != defaultClusterSet) {
889                         ResourceImpl result = getNewResource(defaultClusterSet);
890                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
891                     return result;
892                 } else {
893                     currentCluster = getNewResourceCluster();
894                 }
895             }
896             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
897         }
898
899         @Override
900         public Resource createResource(VirtualGraph provider, long clusterId)
901         throws DatabaseException {
902             return getNewResource(clusterId);
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, Resource clusterSet)
907         throws DatabaseException {
908             return getNewResource(clusterSet);
909         }
910
911         @Override
912         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
913                 throws DatabaseException {
914             getNewClusterSet(clusterSet);
915         }
916
917         @Override
918         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
919         throws ServiceException {
920             return containsClusterSet(clusterSet);
921         }
922
923         public void selectCluster(long cluster) {
924                 currentCluster = clusterTable.getClusterByClusterId(cluster);
925                 long setResourceId = clusterSetsSupport.getSet(cluster);
926                 clusterSetsSupport.put(setResourceId, cluster);
927         }
928
929         @Override
930         public Resource setDefaultClusterSet(Resource clusterSet)
931         throws ServiceException {
932                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
933                 if(clusterSet != null) {
934                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
935                         currentCluster = clusterTable.getClusterByClusterId(id);
936                         return result;
937                 } else {
938                         currentCluster = null;
939                         return null;
940                 }
941         }
942
943         @Override
944         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
945
946                  provider = getProvider(provider);
947              if (null == provider) {
948                  int key = ((ResourceImpl)resource).id;
949
950
951
952                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
953 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
954 //                      if(key != queryProvider2.getRootLibrary())
955 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
956
957 //                 try {
958 //                     cluster.removeValue(key, clusterTranslator);
959 //                 } catch (DatabaseException e) {
960 //                     Logger.defaultLogError(e);
961 //                     return;
962 //                 }
963
964                  clusterTable.writeOnlyInvalidate(cluster);
965
966                  try {
967                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
968                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
969                          clusterTranslator.removeValue(cluster);
970                  } catch (DatabaseException e) {
971                          Logger.defaultLogError(e);
972                  }
973
974                  queryProvider2.invalidateResource(key);
975                  clientChanges.invalidate(key);
976
977              } else {
978                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
979                  queryProvider2.updateValue(querySupport.getId(resource));
980                  clientChanges.claimValue(resource);
981              }
982
983
984         }
985
986         @Override
987         public void flush(boolean intermediate) {
988             throw new UnsupportedOperationException();
989         }
990
991         @Override
992         public void flushCluster() {
993             clusterTable.flushCluster(graphSession);
994             if(defaultClusterSet != null) {
995                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
996             }
997             currentCluster = null;
998         }
999
1000         @Override
1001         public void flushCluster(Resource r) {
1002             throw new UnsupportedOperationException("flushCluster resource " + r);
1003         }
1004
1005         @Override
1006         public void gc() {
1007         }
1008
1009         @Override
1010         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1011                 Resource object) {
1012
1013                 int s = ((ResourceImpl)subject).id;
1014                 int p = ((ResourceImpl)predicate).id;
1015                 int o = ((ResourceImpl)object).id;
1016
1017                 provider = getProvider(provider);
1018                 if (null == provider) {
1019
1020                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1021                         clusterTable.writeOnlyInvalidate(cluster);
1022
1023                         try {
1024
1025                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1026
1027                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1028                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1029
1030                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1031                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1033                                 clusterTranslator.removeStatement(cluster);
1034
1035                                 queryProvider2.invalidateResource(s);
1036                                 clientChanges.invalidate(s);
1037
1038                         } catch (DatabaseException e) {
1039
1040                                 Logger.defaultLogError(e);
1041
1042                         }
1043
1044                         return true;
1045
1046                 } else {
1047                         
1048                         ((VirtualGraphImpl)provider).deny(s, p, o);
1049                         queryProvider2.invalidateResource(s);
1050                         clientChanges.invalidate(s);
1051
1052                         return true;
1053
1054                 }
1055
1056         }
1057
1058         @Override
1059         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1060             throw new UnsupportedOperationException();
1061         }
1062
1063         @Override
1064         public boolean writeOnly() {
1065             return true;
1066         }
1067
1068         @Override
1069         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1070             writeSupport.performWriteRequest(graph, request);
1071         }
1072
1073         @Override
1074         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1075             throw new UnsupportedOperationException();
1076         }
1077
1078         @Override
1079         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1080             throw new UnsupportedOperationException();
1081         }
1082
1083         @Override
1084         public <T> void addMetadata(Metadata data) throws ServiceException {
1085             writeSupport.addMetadata(data);
1086         }
1087
1088         @Override
1089         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1090             return writeSupport.getMetadata(clazz);
1091         }
1092
1093         @Override
1094         public TreeMap<String, byte[]> getMetadata() {
1095             return writeSupport.getMetadata();
1096         }
1097
1098         @Override
1099         public void commitDone(WriteTraits writeTraits, long csid) {
1100             writeSupport.commitDone(writeTraits, csid);
1101         }
1102
1103         @Override
1104         public void clearUndoList(WriteTraits writeTraits) {
1105             writeSupport.clearUndoList(writeTraits);
1106         }
1107         @Override
1108         public int clearMetadata() {
1109             return writeSupport.clearMetadata();
1110         }
1111
1112                 @Override
1113                 public void startUndo() {
1114                         writeSupport.startUndo();
1115                 }
1116     }
1117
1118     class VirtualWriteOnlySupport implements WriteSupport {
1119
1120 //        @Override
1121 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1122 //                Resource object) {
1123 //            throw new UnsupportedOperationException();
1124 //        }
1125
1126         @Override
1127         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1128
1129             TransientGraph impl = (TransientGraph)provider;
1130             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1131             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1132             clientChanges.claim(subject, predicate, object);
1133
1134         }
1135
1136         @Override
1137         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1138
1139             TransientGraph impl = (TransientGraph)provider;
1140             impl.claim(subject, predicate, object);
1141             getQueryProvider2().updateStatements(subject, predicate);
1142             clientChanges.claim(subject, predicate, object);
1143
1144         }
1145
1146         @Override
1147         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1148             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1149         }
1150
1151         @Override
1152         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1153             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1154             getQueryProvider2().updateValue(resource);
1155             clientChanges.claimValue(resource);
1156         }
1157
1158         @Override
1159         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1160             byte[] value = reader.readBytes(null, amount);
1161             claimValue(provider, resource, value);
1162         }
1163
1164         @Override
1165         public Resource createResource(VirtualGraph provider) {
1166             TransientGraph impl = (TransientGraph)provider;
1167             return impl.getResource(impl.newResource(false));
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider, long clusterId) {
1172             throw new UnsupportedOperationException();
1173         }
1174
1175         @Override
1176         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1177         throws DatabaseException {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws ServiceException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public Resource setDefaultClusterSet(Resource clusterSet)
1195         throws ServiceException {
1196                 return null;
1197         }
1198
1199         @Override
1200         public void denyValue(VirtualGraph provider, Resource resource) {
1201             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1202             getQueryProvider2().updateValue(querySupport.getId(resource));
1203             // NOTE: this only keeps track of value changes by-resource.
1204             clientChanges.claimValue(resource);
1205         }
1206
1207         @Override
1208         public void flush(boolean intermediate) {
1209             throw new UnsupportedOperationException();
1210         }
1211
1212         @Override
1213         public void flushCluster() {
1214             throw new UnsupportedOperationException();
1215         }
1216
1217         @Override
1218         public void flushCluster(Resource r) {
1219             throw new UnsupportedOperationException("Resource " + r);
1220         }
1221
1222         @Override
1223         public void gc() {
1224         }
1225
1226         @Override
1227         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1228             TransientGraph impl = (TransientGraph) provider;
1229             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1230             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1231             clientChanges.deny(subject, predicate, object);
1232             return true;
1233         }
1234
1235         @Override
1236         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1237             throw new UnsupportedOperationException();
1238         }
1239
1240         @Override
1241         public boolean writeOnly() {
1242             return true;
1243         }
1244
1245         @Override
1246         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1247             throw new UnsupportedOperationException();
1248         }
1249
1250         @Override
1251         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1252             throw new UnsupportedOperationException();
1253         }
1254
1255         @Override
1256         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1257             throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public <T> void addMetadata(Metadata data) throws ServiceException {
1262             throw new UnsupportedOperationException();
1263         }
1264
1265         @Override
1266         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1267             throw new UnsupportedOperationException();
1268         }
1269
1270         @Override
1271         public TreeMap<String, byte[]> getMetadata() {
1272             throw new UnsupportedOperationException();
1273         }
1274
1275         @Override
1276         public void commitDone(WriteTraits writeTraits, long csid) {
1277         }
1278
1279         @Override
1280         public void clearUndoList(WriteTraits writeTraits) {
1281         }
1282
1283         @Override
1284         public int clearMetadata() {
1285             return 0;
1286         }
1287
1288                 @Override
1289                 public void startUndo() {
1290                 }
1291     }
1292
1293     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1294
1295         try {
1296
1297             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1298
1299             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1300
1301             flushCounter = 0;
1302             Disposable.safeDispose(clientChanges);
1303             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1304
1305             acquireWriteOnly();
1306
1307             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1308
1309             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1310
1311             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1312             writeState = writeStateT;
1313
1314             assert (null != writer);
1315 //            writer.state.barrier.inc();
1316             long start = System.nanoTime();
1317             T result = request.perform(writer);
1318             long duration = System.nanoTime() - start;
1319             if (DEBUG) {
1320                 System.err.println("################");
1321                 System.err.println("WriteOnly duration " + 1e-9*duration);
1322             }
1323             writeStateT.setResult(result);
1324
1325             // This makes clusters available from server
1326             clusterStream.reallyFlush();
1327
1328             // This will trigger query updates
1329             releaseWriteOnly(writer);
1330             assert (null != writer);
1331
1332             handleUpdatesAndMetadata(writer);
1333
1334         } catch (CancelTransactionException e) {
1335
1336             releaseWriteOnly(writeState.getGraph());
1337
1338             clusterTable.removeWriteOnlyClusters();
1339             state.stopWriteTransaction(clusterStream);
1340
1341         } catch (Throwable e) {
1342
1343             e.printStackTrace();
1344
1345             releaseWriteOnly(writeState.getGraph());
1346
1347             clusterTable.removeWriteOnlyClusters();
1348
1349             if (callback != null)
1350                 callback.exception(new DatabaseException(e));
1351
1352             state.stopWriteTransaction(clusterStream);
1353             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1354
1355         } finally  {
1356
1357             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1358
1359         }
1360
1361     }
1362
1363     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1364         scheduleRequest(request, callback, notify, null);
1365     }
1366
1367     @Override
1368     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1369
1370         assertAlive();
1371
1372         assert (request != null);
1373
1374         requestManager.scheduleWrite(new SessionTask(null) {
1375
1376             @Override
1377             public void run0(int thread) {
1378
1379                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1380
1381                     try {
1382
1383                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1384
1385                         flushCounter = 0;
1386                         Disposable.safeDispose(clientChanges);
1387                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1388
1389                         acquireWriteOnly();
1390
1391                         VirtualGraph vg = getProvider(request.getProvider());
1392                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1393
1394                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1395
1396                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1397
1398                             @Override
1399                             public void execute(Object result) {
1400                                 if(callback != null) callback.accept(null);
1401                             }
1402
1403                             @Override
1404                             public void exception(Throwable t) {
1405                                 if(callback != null) callback.accept((DatabaseException)t);
1406                             }
1407
1408                         });
1409
1410                         assert (null != writer);
1411 //                        writer.state.barrier.inc();
1412
1413                         try {
1414
1415                             request.perform(writer);
1416
1417                         } catch (Throwable e) {
1418
1419 //                            writer.state.barrier.dec();
1420 //                            writer.waitAsync(null);
1421
1422                             releaseWriteOnly(writer);
1423
1424                             clusterTable.removeWriteOnlyClusters();
1425
1426                             if(!(e instanceof CancelTransactionException)) {
1427                                 if (callback != null)
1428                                     callback.accept(new DatabaseException(e));
1429                             }
1430
1431                             writeState.except(e);
1432
1433                             return;
1434
1435                         }
1436
1437                         // This makes clusters available from server
1438                         boolean empty = clusterStream.reallyFlush();
1439                         // This was needed to make WO requests call metadata listeners.
1440                         // NOTE: the calling event does not contain clientChanges information.
1441                         if (!empty && clientChanges.isEmpty())
1442                             clientChanges.setNotEmpty(true);
1443                         releaseWriteOnly(writer);
1444                         assert (null != writer);
1445                     } finally  {
1446
1447                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1448
1449                     }
1450
1451
1452                 task.finish();
1453
1454             }
1455
1456         }, combine);
1457
1458     }
1459
1460     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1461         scheduleRequest(request, callback, notify, null);
1462     }
1463
1464     /* (non-Javadoc)
1465      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1466      */
1467     @Override
1468     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1469
1470         assert (request != null);
1471
1472         requestManager.scheduleWrite(new SessionTask(null) {
1473
1474             @Override
1475             public void run0(int thread) {
1476
1477                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1478
1479                 performWriteOnly(request, notify, callback);
1480
1481                 task.finish();
1482
1483             }
1484
1485         }, combine);
1486
1487     }
1488
1489     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1490
1491         assert (request != null);
1492         assert (procedure != null);
1493
1494         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1495
1496         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1497
1498             @Override
1499             public void run0(int thread) {
1500
1501                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1502
1503                 ListenerBase listener = getListenerBase(procedure);
1504
1505                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1506
1507                 // This is never synced but increase to prevent it from visiting 0
1508                 newGraph.asyncBarrier.inc();
1509
1510                 try {
1511
1512                     if (listener != null) {
1513
1514                         try {
1515                                 
1516                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1517
1518                                         @Override
1519                                         public void exception(AsyncReadGraph graph, Throwable t) {
1520                                                 procedure.exception(graph, t);
1521                                                 if(throwable != null) {
1522                                                         throwable.set(t);
1523                                                 } else {
1524                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1525                                                 }
1526                                         }
1527
1528                                         @Override
1529                                         public void execute(AsyncReadGraph graph, T t) {
1530                                                 if(result != null) result.set(t);
1531                                                 procedure.execute(graph, t);
1532                                         }
1533
1534                                 };
1535                                 
1536                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1537                                 
1538                         } catch (Throwable t) {
1539                             // This is handled by the AsyncProcedure
1540                                 //Logger.defaultLogError("Internal error", t);
1541                         }
1542
1543                     } else {
1544
1545                         try {
1546
1547 //                            newGraph.state.barrier.inc();
1548
1549                             T t = request.perform(newGraph);
1550
1551                             try {
1552
1553                                 if(result != null) result.set(t);
1554                                 procedure.execute(newGraph, t);
1555
1556                             } catch (Throwable th) {
1557
1558                                 if(throwable != null) {
1559                                     throwable.set(th);
1560                                 } else {
1561                                     Logger.defaultLogError("Unhandled exception", th);
1562                                 }
1563
1564                             }
1565
1566                         } catch (Throwable t) {
1567
1568                             if (DEBUG)
1569                                 t.printStackTrace();
1570
1571                             if(throwable != null) {
1572                                 throwable.set(t);
1573                             } else {
1574                                 Logger.defaultLogError("Unhandled exception", t);
1575                             }
1576
1577                             try {
1578
1579                                 procedure.exception(newGraph, t);
1580
1581                             } catch (Throwable t2) {
1582
1583                                 if(throwable != null) {
1584                                     throwable.set(t2);
1585                                 } else {
1586                                     Logger.defaultLogError("Unhandled exception", t2);
1587                                 }
1588
1589                             }
1590
1591                         }
1592
1593 //                        newGraph.state.barrier.dec();
1594 //                        newGraph.waitAsync(request);
1595
1596                     }
1597
1598                 } finally {
1599
1600                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1601
1602                 }
1603
1604             }
1605
1606         });
1607
1608     }
1609
1610     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1611
1612         assert (request != null);
1613         assert (procedure != null);
1614
1615         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1616
1617         requestManager.scheduleRead(new SessionRead(null, notify) {
1618
1619             @Override
1620             public void run0(int thread) {
1621
1622                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1623
1624                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1625
1626                 try {
1627
1628                     if (listener != null) {
1629
1630                         try {
1631                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1632                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1633                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1634                                                 } catch (DatabaseException e) {
1635                                                         Logger.defaultLogError(e);
1636                                                 }
1637
1638                     } else {
1639
1640 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1641 //                                procedure, "request");
1642
1643                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
1644
1645                         try {
1646
1647                             request.perform(newGraph, wrap);
1648                             wrap.get();
1649
1650                         } catch (DatabaseException e) {
1651
1652                                                         Logger.defaultLogError(e);
1653
1654                         }
1655
1656                     }
1657
1658                 } finally {
1659
1660                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1661
1662                 }
1663
1664             }
1665
1666         });
1667
1668     }
1669
1670     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1671
1672         assert (request != null);
1673         assert (procedure != null);
1674
1675         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1676
1677         int sync = notify != null ? thread : -1;
1678
1679         requestManager.scheduleRead(new SessionRead(null, notify) {
1680
1681             @Override
1682             public void run0(int thread) {
1683
1684                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1685
1686                 ListenerBase listener = getListenerBase(procedure);
1687
1688                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1689
1690                 try {
1691
1692                     if (listener != null) {
1693
1694                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1695
1696 //                        newGraph.waitAsync(request);
1697
1698                     } else {
1699
1700                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1701
1702                         try {
1703
1704                             request.perform(newGraph, wrapper);
1705
1706                         } catch (Throwable t) {
1707
1708                             t.printStackTrace();
1709
1710                         }
1711
1712                     }
1713
1714                 } finally {
1715
1716                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1717
1718                 }
1719
1720             }
1721
1722         });
1723
1724     }
1725     
1726     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1727
1728         assert (request != null);
1729         assert (procedure != null);
1730
1731         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1732
1733         int sync = notify != null ? thread : -1;
1734
1735         requestManager.scheduleRead(new SessionRead(null, notify) {
1736
1737             @Override
1738             public void run0(int thread) {
1739
1740                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1741
1742                 ListenerBase listener = getListenerBase(procedure);
1743
1744                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1745
1746                 try {
1747
1748                     if (listener != null) {
1749
1750                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1751
1752 //                        newGraph.waitAsync(request);
1753
1754                     } else {
1755
1756                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1757
1758                         try {
1759
1760                             request.perform(newGraph, wrapper);
1761
1762                         } catch (Throwable t) {
1763
1764                             t.printStackTrace();
1765
1766                         }
1767
1768                     }
1769
1770                 } finally {
1771
1772                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1773
1774                 }
1775
1776             }
1777
1778         });
1779
1780     }
1781
1782     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1783
1784         assert (request != null);
1785         assert (procedure != null);
1786
1787         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1788
1789         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1790
1791             @Override
1792             public void run0(int thread) {
1793
1794                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1795
1796                 ListenerBase listener = getListenerBase(procedure);
1797
1798                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1799
1800                 try {
1801
1802                     if (listener != null) {
1803
1804                         try {
1805                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1806                         } catch (DatabaseException e) {
1807                             Logger.defaultLogError(e);
1808                         }
1809
1810                     } else {
1811
1812 //                        newGraph.state.barrier.inc();
1813
1814                         request.register(newGraph, new Listener<T>() {
1815
1816                             @Override
1817                             public void exception(Throwable t) {
1818                                 if(throwable != null) throwable.set(t);
1819                                 procedure.exception(t);
1820 //                                newGraph.state.barrier.dec();
1821                             }
1822
1823                             @Override
1824                             public void execute(T t) {
1825                                 if(result != null) result.set(t);
1826                                 procedure.execute(t);
1827 //                                newGraph.state.barrier.dec();
1828                             }
1829
1830                             @Override
1831                             public boolean isDisposed() {
1832                                 return true;
1833                             }
1834
1835                         });
1836
1837 //                        newGraph.waitAsync(request);
1838
1839                     }
1840
1841                 } finally {
1842
1843                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1844
1845                 }
1846
1847             }
1848
1849         });
1850
1851     }
1852
1853
1854     @Override
1855     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1856
1857         scheduleRequest(request, procedure, null, null, null);
1858
1859     }
1860
1861     @Override
1862     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1863         assertNotSession();
1864         Semaphore notify = new Semaphore(0);
1865         DataContainer<Throwable> container = new DataContainer<Throwable>();
1866         DataContainer<T> result = new DataContainer<T>();
1867         scheduleRequest(request, procedure, notify, container, result);
1868         acquire(notify, request);
1869         Throwable throwable = container.get();
1870         if(throwable != null) {
1871             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1872             else throw new DatabaseException("Unexpected exception", throwable);
1873         }
1874         return result.get();
1875     }
1876
1877     @Override
1878     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1879         assertNotSession();
1880         Semaphore notify = new Semaphore(0);
1881         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1882         final DataContainer<T> resultContainer = new DataContainer<T>();
1883         scheduleRequest(request, new AsyncProcedure<T>() {
1884             @Override
1885             public void exception(AsyncReadGraph graph, Throwable throwable) {
1886                 exceptionContainer.set(throwable);
1887                 procedure.exception(graph, throwable);
1888             }
1889             @Override
1890             public void execute(AsyncReadGraph graph, T result) {
1891                 resultContainer.set(result);
1892                 procedure.execute(graph, result);
1893             }
1894         }, getListenerBase(procedure), notify);
1895         acquire(notify, request, procedure);
1896         Throwable throwable = exceptionContainer.get();
1897         if (throwable != null) {
1898             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1899             else throw new DatabaseException("Unexpected exception", throwable);
1900         }
1901         return resultContainer.get();
1902     }
1903
1904
1905     @Override
1906     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1907         assertNotSession();
1908         Semaphore notify = new Semaphore(0);
1909         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1910         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1911             @Override
1912             public void exception(AsyncReadGraph graph, Throwable throwable) {
1913                 exceptionContainer.set(throwable);
1914                 procedure.exception(graph, throwable);
1915             }
1916             @Override
1917             public void execute(AsyncReadGraph graph, T result) {
1918                 procedure.execute(graph, result);
1919             }
1920             @Override
1921             public void finished(AsyncReadGraph graph) {
1922                 procedure.finished(graph);
1923             }
1924         }, notify);
1925         acquire(notify, request, procedure);
1926         Throwable throwable = exceptionContainer.get();
1927         if (throwable != null) {
1928             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1929             else throw new DatabaseException("Unexpected exception", throwable);
1930         }
1931         // TODO: implement return value
1932         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1933         return null;
1934     }
1935
1936     @Override
1937     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1938         return syncRequest(request, new ProcedureAdapter<T>());
1939
1940
1941 //        assert(request != null);
1942 //
1943 //        final DataContainer<T> result = new DataContainer<T>();
1944 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1945 //
1946 //        syncRequest(request, new Procedure<T>() {
1947 //
1948 //            @Override
1949 //            public void execute(T t) {
1950 //                result.set(t);
1951 //            }
1952 //
1953 //            @Override
1954 //            public void exception(Throwable t) {
1955 //                exception.set(t);
1956 //            }
1957 //
1958 //        });
1959 //
1960 //        Throwable t = exception.get();
1961 //        if(t != null) {
1962 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1963 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1964 //        }
1965 //
1966 //        return result.get();
1967
1968     }
1969
1970     @Override
1971     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1972         return syncRequest(request, (Procedure<T>)procedure);
1973     }
1974
1975
1976 //    @Override
1977 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1978 //        assertNotSession();
1979 //        Semaphore notify = new Semaphore(0);
1980 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1981 //        DataContainer<T> result = new DataContainer<T>();
1982 //        scheduleRequest(request, procedure, notify, container, result);
1983 //        acquire(notify, request);
1984 //        Throwable throwable = container.get();
1985 //        if(throwable != null) {
1986 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1987 //            else throw new DatabaseException("Unexpected exception", throwable);
1988 //        }
1989 //        return result.get();
1990 //    }
1991
1992
1993     @Override
1994     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1995         assertNotSession();
1996         Semaphore notify = new Semaphore(0);
1997         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1998         final DataContainer<T> result = new DataContainer<T>();
1999         scheduleRequest(request, procedure, notify, container, result);
2000         acquire(notify, request);
2001         Throwable throwable = container.get();
2002         if (throwable != null) {
2003             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2004             else throw new DatabaseException("Unexpected exception", throwable);
2005         }
2006         return result.get();
2007     }
2008
2009     @Override
2010     public void syncRequest(Write request) throws DatabaseException  {
2011         assertNotSession();
2012         assertAlive();
2013         Semaphore notify = new Semaphore(0);
2014         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2015         scheduleRequest(request, e -> exception.set(e), notify);
2016         acquire(notify, request);
2017         if(exception.get() != null) throw exception.get();
2018     }
2019
2020     @Override
2021     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2022         assertNotSession();
2023         Semaphore notify = new Semaphore(0);
2024         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2025         final DataContainer<T> result = new DataContainer<T>();
2026         scheduleRequest(request, new Procedure<T>() {
2027
2028             @Override
2029             public void exception(Throwable t) {
2030               exception.set(t);
2031             }
2032
2033             @Override
2034             public void execute(T t) {
2035               result.set(t);
2036             }
2037
2038         }, notify);
2039         acquire(notify, request);
2040         if(exception.get() != null) {
2041             Throwable t = exception.get();
2042             if(t instanceof DatabaseException) throw (DatabaseException)t;
2043             else throw new DatabaseException(t);
2044         }
2045         return result.get();
2046     }
2047
2048     @Override
2049     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2050         assertNotSession();
2051         Semaphore notify = new Semaphore(0);
2052         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2053         final DataContainer<T> result = new DataContainer<T>();
2054         scheduleRequest(request, new Procedure<T>() {
2055
2056             @Override
2057             public void exception(Throwable t) {
2058               exception.set(t);
2059             }
2060
2061             @Override
2062             public void execute(T t) {
2063               result.set(t);
2064             }
2065
2066         }, notify);
2067         acquire(notify, request);
2068         if(exception.get() != null) {
2069             Throwable t = exception.get();
2070             if(t instanceof DatabaseException) throw (DatabaseException)t;
2071             else throw new DatabaseException(t);
2072         }
2073         return result.get();
2074     }
2075
2076     @Override
2077     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2078         assertNotSession();
2079         Semaphore notify = new Semaphore(0);
2080         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2081         final DataContainer<T> result = new DataContainer<T>();
2082         scheduleRequest(request, new Procedure<T>() {
2083
2084             @Override
2085             public void exception(Throwable t) {
2086               exception.set(t);
2087             }
2088
2089             @Override
2090             public void execute(T t) {
2091               result.set(t);
2092             }
2093
2094         }, notify);
2095         acquire(notify, request);
2096         if(exception.get() != null) {
2097             Throwable t = exception.get();
2098             if(t instanceof DatabaseException) throw (DatabaseException)t;
2099             else throw new DatabaseException(t);
2100         }
2101         return result.get();
2102     }
2103
2104     @Override
2105     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2106         assertNotSession();
2107         Semaphore notify = new Semaphore(0);
2108         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2109         scheduleRequest(request, e -> exception.set(e), notify);
2110         acquire(notify, request);
2111         if(exception.get() != null) throw exception.get();
2112     }
2113
2114     @Override
2115     public void syncRequest(WriteOnly request) throws DatabaseException  {
2116         assertNotSession();
2117         assertAlive();
2118         Semaphore notify = new Semaphore(0);
2119         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2120         scheduleRequest(request, e -> exception.set(e), notify);
2121         acquire(notify, request);
2122         if(exception.get() != null) throw exception.get();
2123     }
2124
2125     /*
2126      *
2127      * GraphRequestProcessor interface
2128      */
2129
2130     @Override
2131     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2132
2133         scheduleRequest(request, procedure, null, null);
2134
2135     }
2136
2137     @Override
2138     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2139
2140         scheduleRequest(request, procedure, null);
2141
2142     }
2143
2144     @Override
2145     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2146
2147         scheduleRequest(request, procedure, null, null, null);
2148
2149     }
2150
2151     @Override
2152     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2153
2154         scheduleRequest(request, callback, null);
2155
2156     }
2157
2158     @Override
2159     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2160
2161         scheduleRequest(request, procedure, null);
2162
2163     }
2164
2165     @Override
2166     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2167
2168         scheduleRequest(request, procedure, null);
2169
2170     }
2171
2172     @Override
2173     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2174
2175         scheduleRequest(request, procedure, null);
2176
2177     }
2178
2179     @Override
2180     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2181
2182         scheduleRequest(request, callback, null);
2183
2184     }
2185
2186     @Override
2187     public void asyncRequest(final Write r) {
2188         asyncRequest(r, null);
2189     }
2190
2191     @Override
2192     public void asyncRequest(final DelayedWrite r) {
2193         asyncRequest(r, null);
2194     }
2195
2196     @Override
2197     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2198
2199         scheduleRequest(request, callback, null);
2200
2201     }
2202
2203     @Override
2204     public void asyncRequest(final WriteOnly request) {
2205
2206         asyncRequest(request, null);
2207
2208     }
2209
2210     @Override
2211     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2212         r.request(this, procedure);
2213     }
2214
2215     @Override
2216     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2217         r.request(this, procedure);
2218     }
2219
2220     @Override
2221     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2222         r.request(this, procedure);
2223     }
2224
2225     @Override
2226     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2227         r.request(this, procedure);
2228     }
2229
2230     @Override
2231     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2232         r.request(this, procedure);
2233     }
2234
2235     @Override
2236     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2237         r.request(this, procedure);
2238     }
2239
2240     @Override
2241     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2242         return r.request(this);
2243     }
2244
2245     @Override
2246     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2247         return r.request(this);
2248     }
2249
2250     @Override
2251     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2252         r.request(this, procedure);
2253     }
2254
2255     @Override
2256     public <T> void async(WriteInterface<T> r) {
2257         r.request(this, new ProcedureAdapter<T>());
2258     }
2259
2260     //@Override
2261     public void incAsync() {
2262         state.incAsync();
2263     }
2264
2265     //@Override
2266     public void decAsync() {
2267         state.decAsync();
2268     }
2269
2270     public long getCluster(ResourceImpl resource) {
2271         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2272         return cluster.getClusterId();
2273     }
2274
2275     public long getCluster(int id) {
2276         if (clusterTable == null)
2277             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2278         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2279     }
2280
2281     public ResourceImpl getResource(int id) {
2282         return new ResourceImpl(resourceSupport, id);
2283     }
2284
2285     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2286         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2287         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2288         int key = proxy.getClusterKey();
2289         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2290         return new ResourceImpl(resourceSupport, resourceKey);
2291     }
2292
2293     public ResourceImpl getResource2(int id) {
2294         assert (id != 0);
2295         return new ResourceImpl(resourceSupport, id);
2296     }
2297
2298     final public int getId(ResourceImpl impl) {
2299         return impl.id;
2300     }
2301
2302     public static final Charset UTF8 = Charset.forName("utf-8");
2303
2304
2305
2306
2307     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2308         for(TransientGraph g : support.providers) {
2309             if(g.isPending(subject)) return false;
2310         }
2311         return true;
2312     }
2313
2314     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2315         for(TransientGraph g : support.providers) {
2316             if(g.isPending(subject, predicate)) return false;
2317         }
2318         return true;
2319     }
2320
2321     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2322
2323         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2324
2325             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2326
2327             @Override
2328             public void accept(ReadGraphImpl graph) {
2329                 if(ready.decrementAndGet() == 0) {
2330                     runnable.accept(graph);
2331                 }
2332             }
2333
2334         };
2335
2336         for(TransientGraph g : support.providers) {
2337             if(g.isPending(subject)) {
2338                 try {
2339                     g.load(graph, subject, composite);
2340                 } catch (DatabaseException e) {
2341                     e.printStackTrace();
2342                 }
2343             } else {
2344                 composite.accept(graph);
2345             }
2346         }
2347
2348         composite.accept(graph);
2349
2350     }
2351
2352     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2353
2354         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2355
2356             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2357
2358             @Override
2359             public void accept(ReadGraphImpl graph) {
2360                 if(ready.decrementAndGet() == 0) {
2361                     runnable.accept(graph);
2362                 }
2363             }
2364
2365         };
2366
2367         for(TransientGraph g : support.providers) {
2368             if(g.isPending(subject, predicate)) {
2369                 try {
2370                     g.load(graph, subject, predicate, composite);
2371                 } catch (DatabaseException e) {
2372                     e.printStackTrace();
2373                 }
2374             } else {
2375                 composite.accept(graph);
2376             }
2377         }
2378
2379         composite.accept(graph);
2380
2381     }
2382
2383 //    void dumpHeap() {
2384 //
2385 //        try {
2386 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2387 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2388 //                    "d:/heap" + flushCounter + ".txt", true);
2389 //        } catch (IOException e) {
2390 //            e.printStackTrace();
2391 //        }
2392 //
2393 //    }
2394
2395     void fireReactionsToSynchronize(ChangeSet cs) {
2396
2397         // Do not fire empty events
2398         if (cs.isEmpty())
2399             return;
2400
2401         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2402 //        g.state.barrier.inc();
2403
2404         try {
2405
2406             if (!cs.isEmpty()) {
2407                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2408                 for (ChangeListener l : changeListeners2) {
2409                     try {
2410                         l.graphChanged(e2);
2411                     } catch (Exception ex) {
2412                         ex.printStackTrace();
2413                     }
2414                 }
2415             }
2416
2417         } finally {
2418
2419 //            g.state.barrier.dec();
2420 //            g.waitAsync(null);
2421
2422         }
2423
2424     }
2425
2426     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2427         try {
2428
2429             // Do not fire empty events
2430             if (cs2.isEmpty())
2431                 return;
2432
2433 //            graph.restart();
2434 //            graph.state.barrier.inc();
2435
2436             try {
2437
2438                 if (!cs2.isEmpty()) {
2439
2440                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2441                     for (ChangeListener l : changeListeners2) {
2442                         try {
2443                             l.graphChanged(e2);
2444 //                            System.out.println("changelistener " + l);
2445                         } catch (Exception ex) {
2446                             ex.printStackTrace();
2447                         }
2448                     }
2449
2450                 }
2451
2452             } finally {
2453
2454 //                graph.state.barrier.dec();
2455 //                graph.waitAsync(null);
2456
2457             }
2458         } catch (Throwable t) {
2459             t.printStackTrace();
2460
2461         }
2462     }
2463     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2464
2465         try {
2466
2467             // Do not fire empty events
2468             if (cs2.isEmpty())
2469                 return;
2470
2471             // Do not fire on virtual requests
2472             if(graph.getProvider() != null)
2473                 return;
2474
2475             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2476
2477             try {
2478
2479                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2480                 for (ChangeListener l : metadataListeners) {
2481                     try {
2482                         l.graphChanged(e2);
2483                     } catch (Throwable ex) {
2484                         ex.printStackTrace();
2485                     }
2486                 }
2487
2488             } finally {
2489
2490             }
2491
2492         } catch (Throwable t) {
2493             t.printStackTrace();
2494         }
2495
2496     }
2497
2498
2499     /*
2500      * (non-Javadoc)
2501      *
2502      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2503      */
2504     @Override
2505     public <T> T getService(Class<T> api) {
2506         T t = peekService(api);
2507         if (t == null) {
2508             if (state.isClosed())
2509                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2510             throw new ServiceNotFoundException(this, api);
2511         }
2512         return t;
2513     }
2514
2515     /**
2516      * @return
2517      */
2518     protected abstract ServerInformation getCachedServerInformation();
2519
2520         private Class<?> serviceKey1 = null;
2521         private Class<?> serviceKey2 = null;
2522         private Object service1 = null;
2523         private Object service2 = null;
2524
2525     /*
2526      * (non-Javadoc)
2527      *
2528      * @see
2529      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2530      */
2531     @SuppressWarnings("unchecked")
2532     @Override
2533     public synchronized <T> T peekService(Class<T> api) {
2534
2535                 if(serviceKey1 == api) {
2536                         return (T)service1;
2537                 } else if (serviceKey2 == api) {
2538                         // Promote this key
2539                         Object result = service2;
2540                         service2 = service1;
2541                         serviceKey2 = serviceKey1;
2542                         service1 = result;
2543                         serviceKey1 = api;
2544                         return (T)result;
2545                 }
2546
2547         if (Layer0.class == api)
2548                 return (T) L0;
2549         if (ServerInformation.class == api)
2550             return (T) getCachedServerInformation();
2551         else if (WriteGraphImpl.class == api)
2552             return (T) writeState.getGraph();
2553         else if (ClusterBuilder.class == api)
2554             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2555         else if (ClusterBuilderFactory.class == api)
2556             return (T)new ClusterBuilderFactoryImpl(this);
2557
2558                 service2 = service1;
2559                 serviceKey2 = serviceKey1;
2560
2561         service1 = serviceLocator.peekService(api);
2562         serviceKey1 = api;
2563
2564         return (T)service1;
2565
2566     }
2567
2568     /*
2569      * (non-Javadoc)
2570      *
2571      * @see
2572      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2573      */
2574     @Override
2575     public boolean hasService(Class<?> api) {
2576         return serviceLocator.hasService(api);
2577     }
2578
2579     /**
2580      * @param api the api that must be implemented by the specified service
2581      * @param service the service implementation
2582      */
2583     @Override
2584     public <T> void registerService(Class<T> api, T service) {
2585         if(Layer0.class == api) {
2586                 L0 = (Layer0)service;
2587                 return;
2588         }
2589         serviceLocator.registerService(api, service);
2590         if (TransactionPolicySupport.class == api) {
2591             transactionPolicy = (TransactionPolicySupport)service;
2592             state.resetTransactionPolicy();
2593         }
2594         if (api == serviceKey1)
2595                 service1 = service;
2596         else if (api == serviceKey2)
2597                 service2 = service;
2598     }
2599
2600     // ----------------
2601     // SessionMonitor
2602     // ----------------
2603
2604     void fireSessionVariableChange(String variable) {
2605         for (MonitorHandler h : monitorHandlers) {
2606             MonitorContext ctx = monitorContexts.getLeft(h);
2607             assert ctx != null;
2608             // SafeRunner functionality repeated here to avoid dependency.
2609             try {
2610                 h.valuesChanged(ctx);
2611             } catch (Exception e) {
2612                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2613             } catch (LinkageError e) {
2614                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2615             }
2616         }
2617     }
2618
2619
2620     class ResourceSerializerImpl implements ResourceSerializer {
2621
2622         public long createRandomAccessId(int id) throws DatabaseException {
2623             if(id < 0)
2624                 return id;
2625             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2626             long cluster = getCluster(id);
2627             if (0 == cluster)
2628                 return 0; // Better to return 0 then invalid id.
2629             long result = ClusterTraitsBase.createResourceId(cluster, index);
2630             return result;
2631         }
2632
2633         @Override
2634         public long getRandomAccessId(Resource resource) throws DatabaseException {
2635
2636             ResourceImpl resourceImpl = (ResourceImpl) resource;
2637             return createRandomAccessId(resourceImpl.id);
2638
2639         }
2640
2641         @Override
2642         public int getTransientId(Resource resource) throws DatabaseException {
2643
2644             ResourceImpl resourceImpl = (ResourceImpl) resource;
2645             return resourceImpl.id;
2646
2647         }
2648
2649         @Override
2650         public String createRandomAccessId(Resource resource)
2651         throws InvalidResourceReferenceException {
2652
2653             if(resource == null) throw new IllegalArgumentException();
2654
2655             ResourceImpl resourceImpl = (ResourceImpl) resource;
2656
2657             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2658
2659             int r;
2660             try {
2661                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2662             } catch (DatabaseException e1) {
2663                 throw new InvalidResourceReferenceException(e1);
2664             }
2665             try {
2666                 // Serialize as '<resource index>_<cluster id>'
2667                 return "" + r + "_" + getCluster(resourceImpl);
2668             } catch (Throwable e) {
2669                 e.printStackTrace();
2670                 throw new InvalidResourceReferenceException(e);
2671             } finally {
2672             }
2673         }
2674
2675         public int getTransientId(long serialized) throws DatabaseException {
2676             if (serialized <= 0)
2677                 return (int)serialized;
2678             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2679             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2680             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2681             if (cluster == null)
2682                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2683             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2684             return key;
2685         }
2686
2687         @Override
2688         public Resource getResource(long randomAccessId) throws DatabaseException {
2689             return getResourceByKey(getTransientId(randomAccessId));
2690         }
2691
2692         @Override
2693         public Resource getResource(int transientId) throws DatabaseException {
2694             return getResourceByKey(transientId);
2695         }
2696
2697         @Override
2698         public Resource getResource(String randomAccessId)
2699         throws InvalidResourceReferenceException {
2700             try {
2701                 int i = randomAccessId.indexOf('_');
2702                 if (i == -1)
2703                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2704                             + randomAccessId + "'");
2705                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2706                 if(r < 0) return getResourceByKey(r);
2707                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2708                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2709                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2710                 if (cluster.hasResource(key, clusterTranslator))
2711                     return getResourceByKey(key);
2712             } catch (InvalidResourceReferenceException e) {
2713                 throw e;
2714             } catch (NumberFormatException e) {
2715                 throw new InvalidResourceReferenceException(e);
2716             } catch (Throwable e) {
2717                 e.printStackTrace();
2718                 throw new InvalidResourceReferenceException(e);
2719             } finally {
2720             }
2721             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2722         }
2723
2724         @Override
2725         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2726             try {
2727                 return true;
2728             } catch (Throwable e) {
2729                 e.printStackTrace();
2730                 throw new InvalidResourceReferenceException(e);
2731             } finally {
2732             }
2733         }
2734     }
2735
2736     // Copied from old SessionImpl
2737
2738     void check() {
2739         if (state.isClosed())
2740             throw new Error("Session closed.");
2741     }
2742
2743
2744
2745     public ResourceImpl getNewResource(long clusterId) {
2746         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2747         int newId;
2748         try {
2749             newId = cluster.createResource(clusterTranslator);
2750         } catch (DatabaseException e) {
2751             Logger.defaultLogError(e);
2752             return null;
2753         }
2754         return new ResourceImpl(resourceSupport, newId);
2755     }
2756
2757     public ResourceImpl getNewResource(Resource clusterSet)
2758     throws DatabaseException {
2759         long resourceId = clusterSet.getResourceId();
2760         Long clusterId = clusterSetsSupport.get(resourceId);
2761         if (null == clusterId)
2762             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2763         if (Constants.NewClusterId == clusterId) {
2764             clusterId = getService(ClusteringSupport.class).createCluster();
2765             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2766                 createdClusters.add(clusterId);
2767             clusterSetsSupport.put(resourceId, clusterId);
2768             return getNewResource(clusterId);
2769         } else {
2770             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2771             ResourceImpl result;
2772             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2773                 clusterId = getService(ClusteringSupport.class).createCluster();
2774                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2775                     createdClusters.add(clusterId);
2776                 clusterSetsSupport.put(resourceId, clusterId);
2777                 return getNewResource(clusterId);
2778             } else {
2779                 result = getNewResource(clusterId);
2780                 int resultKey = querySupport.getId(result);
2781                 long resultCluster = querySupport.getClusterId(resultKey);
2782                 if (clusterId != resultCluster)
2783                     clusterSetsSupport.put(resourceId, resultCluster);
2784                 return result;
2785             }
2786         }
2787     }
2788
2789     public void getNewClusterSet(Resource clusterSet)
2790     throws DatabaseException {
2791         if (DEBUG)
2792             System.out.println("new cluster set=" + clusterSet);
2793         long resourceId = clusterSet.getResourceId();
2794         if (clusterSetsSupport.containsKey(resourceId))
2795             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2796         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2797     }
2798     public boolean containsClusterSet(Resource clusterSet)
2799     throws ServiceException {
2800         long resourceId = clusterSet.getResourceId();
2801         return clusterSetsSupport.containsKey(resourceId);
2802     }
2803     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2804         Resource r = defaultClusterSet;
2805         defaultClusterSet = clusterSet;
2806         return r;
2807     }
2808     void printDiagnostics() {
2809
2810         if (DIAGNOSTICS) {
2811
2812             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2813             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2814             for(ClusterI cluster : clusterTable.getClusters()) {
2815                 try {
2816                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2817                         residentClusters.set(residentClusters.get() + 1);
2818                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2819                     }
2820                 } catch (DatabaseException e) {
2821                     Logger.defaultLogError(e);
2822                 }
2823             }
2824
2825             System.out.println("--------------------------------");
2826             System.out.println("Cluster information:");
2827             System.out.println("-amount of resident clusters=" + residentClusters.get());
2828             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2829
2830             for(ClusterI cluster : clusterTable.getClusters()) {
2831                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2832                 try {
2833                     if (!cluster.isLoaded())
2834                         System.out.println("not loaded]");
2835                     else if (cluster.isEmpty())
2836                         System.out.println("is empty]");
2837                     else {
2838                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2839                         System.out.print(",references=" + cluster.getReferenceCount());
2840                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2841                     }
2842                 } catch (DatabaseException e) {
2843                     Logger.defaultLogError(e);
2844                     System.out.println("is corrupted]");
2845                 }
2846             }
2847
2848             queryProvider2.printDiagnostics();
2849             System.out.println("--------------------------------");
2850         }
2851
2852     }
2853
2854     public void fireStartReadTransaction() {
2855
2856         if (DIAGNOSTICS)
2857             System.out.println("StartReadTransaction");
2858
2859         for (SessionEventListener listener : eventListeners) {
2860             try {
2861                 listener.readTransactionStarted();
2862             } catch (Throwable t) {
2863                 t.printStackTrace();
2864             }
2865         }
2866
2867     }
2868
2869     public void fireFinishReadTransaction() {
2870
2871         if (DIAGNOSTICS)
2872             System.out.println("FinishReadTransaction");
2873
2874         for (SessionEventListener listener : eventListeners) {
2875             try {
2876                 listener.readTransactionFinished();
2877             } catch (Throwable t) {
2878                 t.printStackTrace();
2879             }
2880         }
2881
2882     }
2883
2884     public void fireStartWriteTransaction() {
2885
2886         if (DIAGNOSTICS)
2887             System.out.println("StartWriteTransaction");
2888
2889         for (SessionEventListener listener : eventListeners) {
2890             try {
2891                 listener.writeTransactionStarted();
2892             } catch (Throwable t) {
2893                 t.printStackTrace();
2894             }
2895         }
2896
2897     }
2898
2899     public void fireFinishWriteTransaction() {
2900
2901         if (DIAGNOSTICS)
2902             System.out.println("FinishWriteTransaction");
2903
2904         for (SessionEventListener listener : eventListeners) {
2905             try {
2906                 listener.writeTransactionFinished();
2907             } catch (Throwable t) {
2908                 t.printStackTrace();
2909             }
2910         }
2911
2912         Indexing.resetDependenciesIndexingDisabled();
2913
2914     }
2915
2916     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2917         throw new Error("Not supported at the moment.");
2918     }
2919
2920     State getState() {
2921         return state;
2922     }
2923
2924     ClusterTable getClusterTable() {
2925         return clusterTable;
2926     }
2927
2928     public GraphSession getGraphSession() {
2929         return graphSession;
2930     }
2931
2932     public QueryProcessor getQueryProvider2() {
2933         return queryProvider2;
2934     }
2935
2936     ClientChangesImpl getClientChanges() {
2937         return clientChanges;
2938     }
2939
2940     boolean getWriteOnly() {
2941         return writeOnly;
2942     }
2943
2944     static int counter = 0;
2945
2946     public void onClusterLoaded(long clusterId) {
2947
2948         clusterTable.updateSize();
2949
2950     }
2951
2952
2953
2954
2955     /*
2956      * Implementation of the interface RequestProcessor
2957      */
2958
2959     @Override
2960     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2961
2962         assertNotSession();
2963         assertAlive();
2964
2965         assert(request != null);
2966
2967         final DataContainer<T> result = new DataContainer<T>();
2968         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2969
2970         syncRequest(request, new AsyncProcedure<T>() {
2971
2972             @Override
2973             public void execute(AsyncReadGraph graph, T t) {
2974                 result.set(t);
2975             }
2976
2977             @Override
2978             public void exception(AsyncReadGraph graph, Throwable t) {
2979                 exception.set(t);
2980             }
2981
2982         });
2983
2984         Throwable t = exception.get();
2985         if(t != null) {
2986             if(t instanceof DatabaseException) throw (DatabaseException)t;
2987             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2988         }
2989
2990         return result.get();
2991
2992     }
2993
2994 //    @Override
2995 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2996 //        assertNotSession();
2997 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2998 //    }
2999
3000     @Override
3001     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
3002         assertNotSession();
3003         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3004     }
3005
3006     @Override
3007     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3008         assertNotSession();
3009         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3010     }
3011
3012     @Override
3013     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3014         assertNotSession();
3015         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3016     }
3017
3018     @Override
3019     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3020         assertNotSession();
3021         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3022     }
3023
3024     @Override
3025     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3026
3027         assertNotSession();
3028
3029         assert(request != null);
3030
3031         final DataContainer<T> result = new DataContainer<T>();
3032         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3033
3034         syncRequest(request, new AsyncProcedure<T>() {
3035
3036             @Override
3037             public void execute(AsyncReadGraph graph, T t) {
3038                 result.set(t);
3039             }
3040
3041             @Override
3042             public void exception(AsyncReadGraph graph, Throwable t) {
3043                 exception.set(t);
3044             }
3045
3046         });
3047
3048         Throwable t = exception.get();
3049         if(t != null) {
3050             if(t instanceof DatabaseException) throw (DatabaseException)t;
3051             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3052         }
3053
3054         return result.get();
3055
3056     }
3057
3058     @Override
3059     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3060         assertNotSession();
3061         return syncRequest(request, (AsyncProcedure<T>)procedure);
3062     }
3063
3064     @Override
3065     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3066         assertNotSession();
3067         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3068     }
3069
3070     @Override
3071     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3072         assertNotSession();
3073         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3074     }
3075
3076     @Override
3077     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3078         assertNotSession();
3079         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3080     }
3081
3082     @Override
3083     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3084         assertNotSession();
3085         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3086     }
3087
3088     @Override
3089     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3090
3091         assertNotSession();
3092
3093         assert(request != null);
3094
3095         final ArrayList<T> result = new ArrayList<T>();
3096         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3097
3098         syncRequest(request, new SyncMultiProcedure<T>() {
3099
3100             @Override
3101             public void execute(ReadGraph graph, T t) {
3102                 synchronized(result) {
3103                     result.add(t);
3104                 }
3105             }
3106
3107             @Override
3108             public void finished(ReadGraph graph) {
3109             }
3110
3111             @Override
3112             public void exception(ReadGraph graph, Throwable t) {
3113                 exception.set(t);
3114             }
3115
3116
3117         });
3118
3119         Throwable t = exception.get();
3120         if(t != null) {
3121             if(t instanceof DatabaseException) throw (DatabaseException)t;
3122             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3123         }
3124
3125         return result;
3126
3127     }
3128
3129     @Override
3130     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3131         assertNotSession();
3132         throw new Error("Not implemented!");
3133     }
3134
3135     @Override
3136     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3137         assertNotSession();
3138         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3139     }
3140
3141     @Override
3142     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3143         assertNotSession();
3144         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3145     }
3146
3147     @Override
3148     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3149         assertNotSession();
3150         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3151     }
3152
3153     @Override
3154     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3155
3156         assertNotSession();
3157
3158         assert(request != null);
3159
3160         final ArrayList<T> result = new ArrayList<T>();
3161         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3162
3163         syncRequest(request, new AsyncMultiProcedure<T>() {
3164
3165             @Override
3166             public void execute(AsyncReadGraph graph, T t) {
3167                 synchronized(result) {
3168                     result.add(t);
3169                 }
3170             }
3171
3172             @Override
3173             public void finished(AsyncReadGraph graph) {
3174             }
3175
3176             @Override
3177             public void exception(AsyncReadGraph graph, Throwable t) {
3178                 exception.set(t);
3179             }
3180
3181
3182         });
3183
3184         Throwable t = exception.get();
3185         if (t != null) {
3186             if (t instanceof DatabaseException)
3187                 throw (DatabaseException) t;
3188             else
3189                 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3190         }
3191
3192         return result;
3193
3194     }
3195
3196     @Override
3197     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3198         assertNotSession();
3199         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3200     }
3201
3202     @Override
3203     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3204         assertNotSession();
3205         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3206     }
3207
3208     @Override
3209     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3210         assertNotSession();
3211        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3212     }
3213
3214     @Override
3215     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3216         assertNotSession();
3217         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3218     }
3219
3220     @Override
3221     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3222         assertNotSession();
3223         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3224     }
3225
3226
3227     @Override
3228     public <T> void asyncRequest(Read<T> request) {
3229
3230         asyncRequest(request, new ProcedureAdapter<T>() {
3231             @Override
3232             public void exception(Throwable t) {
3233                 t.printStackTrace();
3234             }
3235         });
3236
3237     }
3238
3239     @Override
3240     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3241         asyncRequest(request, (AsyncProcedure<T>)procedure);
3242     }
3243
3244     @Override
3245     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3246         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3247     }
3248
3249     @Override
3250     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3251         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3252     }
3253
3254     @Override
3255     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3256         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3257     }
3258
3259     @Override
3260     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3261         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3262     }
3263
3264     @Override
3265     final public <T> void asyncRequest(final AsyncRead<T> request) {
3266
3267         assert(request != null);
3268
3269         asyncRequest(request, new ProcedureAdapter<T>() {
3270             @Override
3271             public void exception(Throwable t) {
3272                 t.printStackTrace();
3273             }
3274         });
3275
3276     }
3277
3278     @Override
3279     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3280         scheduleRequest(request, procedure, procedure, null);
3281     }
3282
3283     @Override
3284     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3285         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3286     }
3287
3288     @Override
3289     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3290         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3291     }
3292
3293     @Override
3294     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3295         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3296     }
3297
3298     @Override
3299     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3300         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3301     }
3302
3303     @Override
3304     public <T> void asyncRequest(MultiRead<T> request) {
3305
3306         assert(request != null);
3307
3308         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3309             @Override
3310             public void exception(ReadGraph graph, Throwable t) {
3311                 t.printStackTrace();
3312             }
3313         });
3314
3315     }
3316
3317     @Override
3318     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3319         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3320     }
3321
3322     @Override
3323     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3324         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3325     }
3326
3327     @Override
3328     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3329         scheduleRequest(request, procedure, null);
3330     }
3331
3332     @Override
3333     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3334         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3335     }
3336
3337     @Override
3338     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3339
3340         assert(request != null);
3341
3342         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3343             @Override
3344             public void exception(AsyncReadGraph graph, Throwable t) {
3345                 t.printStackTrace();
3346             }
3347         });
3348
3349     }
3350
3351     @Override
3352     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3353         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3354     }
3355
3356     @Override
3357     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3358         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3359     }
3360
3361     @Override
3362     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3363         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3364     }
3365
3366     @Override
3367     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3368         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3369     }
3370
3371     @Override
3372     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3373         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3374     }
3375
3376     @Override
3377     final public <T> void asyncRequest(final ExternalRead<T> request) {
3378
3379         assert(request != null);
3380
3381         asyncRequest(request, new ProcedureAdapter<T>() {
3382             @Override
3383             public void exception(Throwable t) {
3384                 t.printStackTrace();
3385             }
3386         });
3387
3388     }
3389
3390     @Override
3391     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3392         asyncRequest(request, (Procedure<T>)procedure);
3393     }
3394
3395     boolean sameProvider(Write request) {
3396         if(writeState.getGraph().provider != null) {
3397             return writeState.getGraph().provider.equals(request.getProvider());
3398         } else {
3399             return request.getProvider() == null;
3400         }
3401     }
3402
3403     boolean plainWrite(WriteGraphImpl graph) {
3404         if(graph == null) return false;
3405         if(graph.writeSupport.writeOnly()) return false;
3406         return true;
3407     }
3408
3409
3410     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3411
3412     private void assertNotSession() throws DatabaseException {
3413         Thread current = Thread.currentThread();
3414         if (sessionThreads.contains(current))
3415             throw new ServiceException("Caller is already inside a transaction.");
3416     }
3417
3418     void assertAlive() {
3419         if (!state.isAlive())
3420             throw new RuntimeDatabaseException("Session has been shut down.");
3421     }
3422
3423     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3424         return querySupport.getValueStream(graph, querySupport.getId(resource));
3425     }
3426
3427     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3428         return querySupport.getValue(graph, querySupport.getId(resource));
3429     }
3430
3431     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3432         acquire(semaphore, request, null);
3433     }
3434
3435     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3436         assertAlive();
3437         try {
3438             long tryCount = 0;
3439             // Loop until the semaphore is acquired reporting requests that take a long time.
3440             while (true) {
3441
3442                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3443                     // Acquired OK.
3444                     return;
3445                 } else {
3446                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3447                         ++tryCount;
3448                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3449                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3450                                 * tryCount;
3451                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3452                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3453                     }
3454                 }
3455
3456                 assertAlive();
3457
3458             }
3459         } catch (InterruptedException e) {
3460             e.printStackTrace();
3461             // FIXME: Should perhaps do something else in this case ??
3462         }
3463     }
3464
3465
3466
3467 //    @Override
3468 //    public boolean holdOnToTransactionAfterCancel() {
3469 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3470 //    }
3471 //
3472 //    @Override
3473 //    public boolean holdOnToTransactionAfterCommit() {
3474 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3475 //    }
3476 //
3477 //    @Override
3478 //    public boolean holdOnToTransactionAfterRead() {
3479 //        return transactionPolicy.holdOnToTransactionAfterRead();
3480 //    }
3481 //
3482 //    @Override
3483 //    public void onRelinquish() {
3484 //        transactionPolicy.onRelinquish();
3485 //    }
3486 //
3487 //    @Override
3488 //    public void onRelinquishDone() {
3489 //        transactionPolicy.onRelinquishDone();
3490 //    }
3491 //
3492 //    @Override
3493 //    public void onRelinquishError() {
3494 //        transactionPolicy.onRelinquishError();
3495 //    }
3496
3497
3498     @Override
3499     public Session getSession() {
3500         return this;
3501     }
3502
3503
3504     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3505     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3506
3507     public void ceased(int thread) {
3508
3509         requestManager.ceased(thread);
3510
3511     }
3512
3513     public int getAmountOfQueryThreads() {
3514         // This must be a power of two
3515         return 1;
3516 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3517     }
3518
3519     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3520
3521         return new ResourceImpl(resourceSupport, key);
3522
3523     }
3524
3525     public void acquireWriteOnly() {
3526         writeOnly = true;
3527     }
3528
3529     public void releaseWriteOnly(ReadGraphImpl graph) {
3530         writeOnly = false;
3531         queryProvider2.releaseWrite(graph);
3532     }
3533
3534     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3535
3536         long start = System.nanoTime();
3537
3538         while(dirtyPrimitives) {
3539             dirtyPrimitives = false;
3540             getQueryProvider2().performDirtyUpdates(writer);
3541             getQueryProvider2().performScheduledUpdates(writer);
3542         }
3543
3544         fireMetadataListeners(writer, clientChanges);
3545
3546         if(DebugPolicy.PERFORMANCE_DATA) {
3547             long end = System.nanoTime();
3548             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3549         }
3550
3551     }
3552
3553     void removeTemporaryData() {
3554         File platform = Platform.getLocation().toFile();
3555         File tempFiles = new File(platform, "tempFiles");
3556         File temp = new File(tempFiles, "db");
3557         if(!temp.exists()) 
3558             return;
3559         try {
3560             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3561                 @Override
3562                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3563                     try {
3564                         Files.delete(file);
3565                     } catch (IOException e) {
3566                         Logger.defaultLogError(e);
3567                     }
3568                     return FileVisitResult.CONTINUE;
3569                 }
3570             });
3571         } catch (IOException e) {
3572             Logger.defaultLogError(e);
3573         }
3574     }
3575
3576     public void handleCreatedClusters() {
3577         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3578             createdClusters.forEach(new TLongProcedure() {
3579
3580                 @Override
3581                 public boolean execute(long value) {
3582                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3583                     cluster.setImmutable(true, clusterTranslator);
3584                     return true;
3585                 }
3586             });
3587         }
3588         createdClusters.clear();
3589     }
3590
3591     @Override
3592     public Object getModificationCounter() {
3593         return queryProvider2.modificationCounter;
3594     }
3595     
3596     @Override
3597     public void markUndoPoint() {
3598         state.setCombine(false);
3599     }
3600
3601 }