]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Remove usage of deprecated SimanticsUI-methods
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.GraphChangeListenerSupport;
147 import org.simantics.db.service.InitSupport;
148 import org.simantics.db.service.LifecycleSupport;
149 import org.simantics.db.service.ManagementSupport;
150 import org.simantics.db.service.QueryControl;
151 import org.simantics.db.service.SerialisationSupport;
152 import org.simantics.db.service.ServerInformation;
153 import org.simantics.db.service.ServiceActivityMonitor;
154 import org.simantics.db.service.SessionEventSupport;
155 import org.simantics.db.service.SessionMonitorSupport;
156 import org.simantics.db.service.SessionUserSupport;
157 import org.simantics.db.service.StatementSupport;
158 import org.simantics.db.service.TransactionPolicySupport;
159 import org.simantics.db.service.TransactionSupport;
160 import org.simantics.db.service.TransferableGraphSupport;
161 import org.simantics.db.service.UndoRedoSupport;
162 import org.simantics.db.service.VirtualGraphSupport;
163 import org.simantics.db.service.XSupport;
164 import org.simantics.layer0.Layer0;
165 import org.simantics.utils.DataContainer;
166 import org.simantics.utils.Development;
167 import org.simantics.utils.threads.logger.ITask;
168 import org.simantics.utils.threads.logger.ThreadLogger;
169
170 import gnu.trove.procedure.TLongProcedure;
171 import gnu.trove.set.hash.TLongHashSet;
172
173
174 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
175
176     protected static final boolean DEBUG        = false;
177
178     private static final boolean DIAGNOSTICS       = false;
179
180     private TransactionPolicySupport transactionPolicy;
181
182     final private ClusterControl clusterControl;
183
184     final protected BuiltinSupportImpl builtinSupport;
185     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
186     final protected ClusterSetsSupport clusterSetsSupport;
187     final protected LifecycleSupportImpl lifecycleSupport;
188
189     protected QuerySupportImpl querySupport;
190     protected ResourceSupportImpl resourceSupport;
191     protected WriteSupport writeSupport;
192     public ClusterTranslator clusterTranslator;
193
194     boolean dirtyPrimitives = false;
195
196     public static final int SERVICE_MODE_CREATE = 2;
197     public static final int SERVICE_MODE_ALLOW = 1;
198     public int serviceMode = 0;
199     public boolean createdImmutableClusters = false;
200     public TLongHashSet createdClusters = new TLongHashSet();
201
202     private Layer0 L0;
203
204     /**
205      * The service locator maintained by the workbench. These services are
206      * initialized during workbench during the <code>init</code> method.
207      */
208     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
209
210     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
211
212     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
213
214     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
215
216     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
217
218     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
219
220     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
221
222     final protected State                                        state              = new State();
223
224     protected GraphSession                                       graphSession       = null;
225
226     protected SessionManager                                     sessionManagerImpl = null;
227
228     protected UserAuthenticationAgent                            authAgent          = null;
229
230     protected UserAuthenticator                                  authenticator      = null;
231
232     protected Resource                                           user               = null;
233
234     protected ClusterStream                                      clusterStream      = null;
235
236     protected SessionRequestManager                         requestManager = null;
237
238     public ClusterTable                                       clusterTable       = null;
239
240     public QueryProcessor                                     queryProvider2 = null;
241
242     ClientChangesImpl                                  clientChanges      = null;
243
244     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
245
246 //    protected long                                               newClusterId       = Constants.NullClusterId;
247
248     protected int                                                flushCounter       = 0;
249
250     protected boolean                                            writeOnly          = false;
251
252     WriteState<?>                                                writeState = null;
253     WriteStateBase<?>                                            delayedWriteState = null;
254     protected Resource defaultClusterSet = null; // If not null then used for newResource().
255
256     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
257
258 //        if (authAgent == null)
259 //            throw new IllegalArgumentException("null authentication agent");
260
261         File t = StaticSessionProperties.virtualGraphStoragePath;
262         if (null == t)
263             t = new File(".");
264         this.clusterTable = new ClusterTable(this, t);
265         this.builtinSupport = new BuiltinSupportImpl(this);
266         this.sessionManagerImpl = sessionManagerImpl;
267         this.user = null;
268 //        this.authAgent = authAgent;
269
270         serviceLocator.registerService(Session.class, this);
271         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
272         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
273         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
274         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
275         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
276         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
277         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
278         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
279         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
280         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
281         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
282         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
283         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
284         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
285         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
286         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
287         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
288         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
289         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
290         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
291         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
292         ServiceActivityUpdaterForWriteTransactions.register(this);
293
294         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
295         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
296         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
297         this.lifecycleSupport = new LifecycleSupportImpl(this);
298         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
299         this.transactionPolicy = new TransactionPolicyRelease();
300         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
301         this.clusterControl = new ClusterControlImpl(this);
302         serviceLocator.registerService(ClusterControl.class, clusterControl);
303         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
304         this.clusterSetsSupport.setReadDirectory(t.toPath());
305         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
306         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
307     }
308
309     /*
310      *
311      * Session interface
312      */
313
314
315     @Override
316     public Resource getRootLibrary() {
317         return queryProvider2.getRootLibraryResource();
318     }
319
320     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
321         if (!graphSession.dbSession.refreshEnabled())
322             return;
323         try {
324             getClusterTable().refresh(csid, this, clusterUID);
325         } catch (Throwable t) {
326             Logger.defaultLogError("Refesh failed.", t);
327         }
328     }
329
330     static final class TaskHelper {
331         private final String name;
332         private Object result;
333         final Semaphore sema = new Semaphore(0);
334         private Throwable throwable = null;
335         final Consumer<DatabaseException> callback = e -> {
336             synchronized (TaskHelper.this) {
337                 throwable = e;
338             }
339         };
340         final Procedure<Object> proc = new Procedure<Object>() {
341             @Override
342             public void execute(Object result) {
343                 callback.accept(null);
344             }
345             @Override
346             public void exception(Throwable t) {
347                 if (t instanceof DatabaseException)
348                     callback.accept((DatabaseException)t);
349                 else
350                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
351             }
352         };
353         final WriteTraits writeTraits = new WriteTraits() {};
354         TaskHelper(String name) {
355             this.name = name;
356         }
357         @SuppressWarnings("unchecked")
358         <T> T getResult() {
359             return (T)result;
360         }
361         void setResult(Object result) {
362             this.result = result;
363         }
364 //        Throwable throwableGet() {
365 //            return throwable;
366 //        }
367         synchronized void throwableSet(Throwable t) {
368             throwable = t;
369         }
370 //        void throwableSet(String t) {
371 //            throwable = new InternalException("" + name + " operation failed. " + t);
372 //        }
373         synchronized void throwableCheck()
374         throws DatabaseException {
375             if (null != throwable)
376                 if (throwable instanceof DatabaseException)
377                     throw (DatabaseException)throwable;
378                 else
379                     throw new DatabaseException("Undo operation failed.", throwable);
380         }
381         void throw_(String message)
382         throws DatabaseException {
383             throw new DatabaseException("" + name + " operation failed. " + message);
384         }
385     }
386
387
388
389     private ListenerBase getListenerBase(Object procedure) {
390         if (procedure instanceof ListenerBase)
391             return (ListenerBase) procedure;
392         else
393             return null;
394     }
395
396     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
397         scheduleRequest(request, callback, notify, null);
398     }
399
400     /* (non-Javadoc)
401      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
402      */
403     @Override
404     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
405
406         assert (request != null);
407
408         if(Development.DEVELOPMENT) {
409             try {
410             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
411                 System.err.println("schedule write '" + request + "'");
412             } catch (Throwable t) {
413                 Logger.defaultLogError(t);
414             }
415         }
416
417         requestManager.scheduleWrite(new SessionTask(null) {
418
419             @Override
420             public void run(int thread) {
421
422                 if(Development.DEVELOPMENT) {
423                     try {
424                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
425                         System.err.println("perform write '" + request + "'");
426                     } catch (Throwable t) {
427                         Logger.defaultLogError(t);
428                     }
429                 }
430
431                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
432
433                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
434
435                 try {
436
437                     flushCounter = 0;
438                     Disposable.safeDispose(clientChanges);
439                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
440
441                     VirtualGraph vg = getProvider(request.getProvider());
442
443                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
444                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
445
446                         @Override
447                         public void execute(Object result) {
448                             if(callback != null) callback.accept(null);
449                         }
450
451                         @Override
452                         public void exception(Throwable t) {
453                             if(callback != null) callback.accept((DatabaseException)t);
454                         }
455
456                     });
457
458                     assert (null != writer);
459 //                    writer.state.barrier.inc();
460
461                     try {
462                         request.perform(writer);
463                         assert (null != writer);
464                     } catch (Throwable t) {
465                         if (!(t instanceof CancelTransactionException))
466                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
467                         writeState.except(t);
468                     } finally {
469 //                        writer.state.barrier.dec();
470 //                        writer.waitAsync(request);
471                     }
472
473
474                     assert(!queryProvider2.cache.dirty);
475
476                 } catch (Throwable e) {
477
478                     // Log it first, just to be safe that the error is always logged.
479                     if (!(e instanceof CancelTransactionException))
480                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
481
482 //                    writeState.getGraph().state.barrier.dec();
483 //                    writeState.getGraph().waitAsync(request);
484
485                     writeState.except(e);
486
487 //                    try {
488 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
489 //                        // All we can do here is to log those, can't really pass them anywhere.
490 //                        if (callback != null) {
491 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
492 //                            else callback.run(new DatabaseException(e));
493 //                        }
494 //                    } catch (Throwable e2) {
495 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
496 //                    }
497 //                    boolean empty = clusterStream.reallyFlush();
498 //                    int callerThread = -1;
499 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
500 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
501 //                    try {
502 //                        // Send all local changes to server so it can calculate correct reverse change set.
503 //                        // This will call clusterStream.accept().
504 //                        state.cancelCommit(context, clusterStream);
505 //                        if (!empty) {
506 //                            if (!context.isOk()) // this is a blocking operation
507 //                                throw new InternalException("Cancel failed. This should never happen.");
508 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
509 //                        }
510 //                        state.cancelCommit2(context, clusterStream);
511 //                    } catch (DatabaseException e3) {
512 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
513 //                    } finally {
514 //                        clusterStream.setOff(false);
515 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
516 //                    }
517
518                 } finally {
519
520                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
521
522                 }
523
524                 task.finish();
525
526                 if(Development.DEVELOPMENT) {
527                     try {
528                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
529                         System.err.println("finish write '" + request + "'");
530                     } catch (Throwable t) {
531                         Logger.defaultLogError(t);
532                     }
533                 }
534
535             }
536
537         }, combine);
538
539     }
540
541     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
542         scheduleRequest(request, procedure, notify, null);
543     }
544
545     /* (non-Javadoc)
546      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
547      */
548     @Override
549     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
550
551         assert (request != null);
552
553         requestManager.scheduleWrite(new SessionTask(null) {
554
555             @Override
556             public void run(int thread) {
557
558                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
559
560                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
561
562                 flushCounter = 0;
563                 Disposable.safeDispose(clientChanges);
564                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
565
566                 VirtualGraph vg = getProvider(request.getProvider());
567                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
568
569                 try {
570                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
571                     writeState = writeStateT;
572
573                     assert (null != writer);
574 //                    writer.state.barrier.inc();
575                     writeStateT.setResult(request.perform(writer));
576                     assert (null != writer);
577
578 //                    writer.state.barrier.dec();
579 //                    writer.waitAsync(null);
580
581                 } catch (Throwable e) {
582
583 //                    writer.state.barrier.dec();
584 //                    writer.waitAsync(null);
585
586                     writeState.except(e);
587
588 //                  state.stopWriteTransaction(clusterStream);
589 //
590 //              } catch (Throwable e) {
591 //                  // Log it first, just to be safe that the error is always logged.
592 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
593 //
594 //                  try {
595 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
596 //                      // All we can do here is to log those, can't really pass them anywhere.
597 //                      if (procedure != null) {
598 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
599 //                          else procedure.exception(new DatabaseException(e));
600 //                      }
601 //                  } catch (Throwable e2) {
602 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
603 //                  }
604 //
605 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
606 //
607 //                  state.stopWriteTransaction(clusterStream);
608
609                 } finally {
610                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
611                 }
612
613 //              if(notify != null) notify.release();
614
615                 task.finish();
616
617             }
618
619         }, combine);
620
621     }
622
623     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
624         scheduleRequest(request, callback, notify, null);
625     }
626
627     /* (non-Javadoc)
628      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
629      */
630     @Override
631     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
632
633         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
634
635         assert (request != null);
636
637         requestManager.scheduleWrite(new SessionTask(null) {
638
639             @Override
640             public void run(int thread) {
641                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
642
643                 Procedure<Object> stateProcedure = new Procedure<Object>() {
644                     @Override
645                     public void execute(Object result) {
646                         if (callback != null)
647                             callback.accept(null);
648                     }
649                     @Override
650                     public void exception(Throwable t) {
651                         if (callback != null) {
652                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
653                             else callback.accept(new DatabaseException(t));
654                         } else
655                             Logger.defaultLogError("Unhandled exception", t);
656                     }
657                 };
658
659                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
660                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
661                 DelayedWriteGraph dwg = null;
662 //                newGraph.state.barrier.inc();
663
664                 try {
665                     dwg = new DelayedWriteGraph(newGraph);
666                     request.perform(dwg);
667                 } catch (Throwable e) {
668                     delayedWriteState.except(e);
669                     total.finish();
670                     dwg.close();
671                     return;
672                 } finally {
673 //                    newGraph.state.barrier.dec();
674 //                    newGraph.waitAsync(request);
675                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
676                 }
677
678                 delayedWriteState = null;
679
680                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
681                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
682
683                 flushCounter = 0;
684                 Disposable.safeDispose(clientChanges);
685                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
686
687                 acquireWriteOnly();
688
689                 VirtualGraph vg = getProvider(request.getProvider());
690                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
691
692                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
693
694                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
695
696                 assert (null != writer);
697 //                writer.state.barrier.inc();
698
699                 try {
700                     // Cannot cancel
701                     dwg.commit(writer, request);
702
703                         if(defaultClusterSet != null) {
704                                 XSupport xs = getService(XSupport.class);
705                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
706                                 ClusteringSupport cs = getService(ClusteringSupport.class);
707                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
708                         }
709
710                     // This makes clusters available from server
711                     clusterStream.reallyFlush();
712
713                     releaseWriteOnly(writer);
714                     handleUpdatesAndMetadata(writer);
715
716                 } catch (ServiceException e) {
717 //                    writer.state.barrier.dec();
718 //                    writer.waitAsync(null);
719
720                     // These shall be requested from server
721                     clusterTable.removeWriteOnlyClusters();
722                     // This makes clusters available from server
723                     clusterStream.reallyFlush();
724
725                     releaseWriteOnly(writer);
726                     writeState.except(e);
727                 } finally {
728                     // Debugging & Profiling
729                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
730                     task2.finish();
731                     total.finish();
732                 }
733             }
734
735         }, combine);
736
737     }
738
739     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
740         scheduleRequest(request, procedure, notify, null);
741     }
742
743     /* (non-Javadoc)
744      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
745      */
746     @Override
747     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
748         throw new Error("Not implemented");
749     }
750
751     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
752         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
753         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
754             createdClusters.add(cluster.clusterId);
755         }
756         return cluster;
757     }
758
759     class WriteOnlySupport implements WriteSupport {
760
761         ClusterStream stream;
762         ClusterImpl currentCluster;
763
764         public WriteOnlySupport() {
765             this.stream = clusterStream;
766         }
767
768         @Override
769         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
770             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
771         }
772
773         @Override
774         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
775
776             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
777
778             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
779                 if(s != queryProvider2.getRootLibrary())
780                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
781
782             try {
783                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
784             } catch (DatabaseException e) {
785                 Logger.defaultLogError(e);
786                 //throw e;
787             }
788
789             clientChanges.invalidate(s);
790
791             if (cluster.isWriteOnly())
792                 return;
793             queryProvider2.updateStatements(s, p);
794
795         }
796
797         @Override
798         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
799             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
800         }
801
802         @Override
803         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
804
805             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
806             try {
807                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
808             } catch (DatabaseException e) {
809                 Logger.defaultLogError(e);
810             }
811
812             clientChanges.invalidate(rid);
813
814             if (cluster.isWriteOnly())
815                 return;
816             queryProvider2.updateValue(rid);
817
818         }
819
820         @Override
821         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
822
823             if(amount < 65536) {
824                 claimValue(provider,resource, reader.readBytes(null, amount));
825                 return;
826             }
827
828             byte[] bytes = new byte[65536];
829
830             int rid = ((ResourceImpl)resource).id;
831             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
832             try {
833                 int left = amount;
834                 while(left > 0) {
835                     int block = Math.min(left, 65536);
836                     reader.readBytes(bytes, block);
837                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
838                     left -= block;
839                 }
840             } catch (DatabaseException e) {
841                 Logger.defaultLogError(e);
842             }
843
844             clientChanges.invalidate(rid);
845
846             if (cluster.isWriteOnly())
847                 return;
848             queryProvider2.updateValue(rid);
849
850         }
851
852         private void maintainCluster(ClusterImpl before, ClusterI after_) {
853             if(after_ != null && after_ != before) {
854                 ClusterImpl after = (ClusterImpl)after_;
855                 if(currentCluster == before) {
856                     currentCluster = after;
857                 }
858                 clusterTable.replaceCluster(after);
859             }
860         }
861
862         public int createResourceKey(int foreignCounter) throws DatabaseException {
863             if(currentCluster == null) {
864                 currentCluster = getNewResourceCluster();
865             }
866             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
867                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
868                 newCluster.foreignLookup = new byte[foreignCounter];
869                 currentCluster = newCluster;
870                 if (DEBUG)
871                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
872             }
873             return currentCluster.createResource(clusterTranslator);
874         }
875
876         @Override
877         public Resource createResource(VirtualGraph provider) throws DatabaseException {
878             if(currentCluster == null) {
879                 if (null != defaultClusterSet) {
880                         ResourceImpl result = getNewResource(defaultClusterSet);
881                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
882                     return result;
883                 } else {
884                     currentCluster = getNewResourceCluster();
885                 }
886             }
887             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
888                 if (null != defaultClusterSet) {
889                         ResourceImpl result = getNewResource(defaultClusterSet);
890                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
891                     return result;
892                 } else {
893                     currentCluster = getNewResourceCluster();
894                 }
895             }
896             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
897         }
898
899         @Override
900         public Resource createResource(VirtualGraph provider, long clusterId)
901         throws DatabaseException {
902             return getNewResource(clusterId);
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, Resource clusterSet)
907         throws DatabaseException {
908             return getNewResource(clusterSet);
909         }
910
911         @Override
912         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
913                 throws DatabaseException {
914             getNewClusterSet(clusterSet);
915         }
916
917         @Override
918         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
919         throws ServiceException {
920             return containsClusterSet(clusterSet);
921         }
922
923         public void selectCluster(long cluster) {
924                 currentCluster = clusterTable.getClusterByClusterId(cluster);
925                 long setResourceId = clusterSetsSupport.getSet(cluster);
926                 clusterSetsSupport.put(setResourceId, cluster);
927         }
928
929         @Override
930         public Resource setDefaultClusterSet(Resource clusterSet)
931         throws ServiceException {
932                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
933                 if(clusterSet != null) {
934                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
935                         currentCluster = clusterTable.getClusterByClusterId(id);
936                         return result;
937                 } else {
938                         currentCluster = null;
939                         return null;
940                 }
941         }
942
943         @Override
944         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
945
946                  provider = getProvider(provider);
947              if (null == provider) {
948                  int key = ((ResourceImpl)resource).id;
949
950
951
952                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
953 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
954 //                      if(key != queryProvider2.getRootLibrary())
955 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
956
957 //                 try {
958 //                     cluster.removeValue(key, clusterTranslator);
959 //                 } catch (DatabaseException e) {
960 //                     Logger.defaultLogError(e);
961 //                     return;
962 //                 }
963
964                  clusterTable.writeOnlyInvalidate(cluster);
965
966                  try {
967                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
968                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
969                          clusterTranslator.removeValue(cluster);
970                  } catch (DatabaseException e) {
971                          Logger.defaultLogError(e);
972                  }
973
974                  queryProvider2.invalidateResource(key);
975                  clientChanges.invalidate(key);
976
977              } else {
978                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
979                  queryProvider2.updateValue(querySupport.getId(resource));
980                  clientChanges.claimValue(resource);
981              }
982
983
984         }
985
986         @Override
987         public void flush(boolean intermediate) {
988             throw new UnsupportedOperationException();
989         }
990
991         @Override
992         public void flushCluster() {
993             clusterTable.flushCluster(graphSession);
994             if(defaultClusterSet != null) {
995                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
996             }
997             currentCluster = null;
998         }
999
1000         @Override
1001         public void flushCluster(Resource r) {
1002             throw new UnsupportedOperationException("flushCluster resource " + r);
1003         }
1004
1005         @Override
1006         public void gc() {
1007         }
1008
1009         @Override
1010         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1011                 Resource object) {
1012
1013                 int s = ((ResourceImpl)subject).id;
1014                 int p = ((ResourceImpl)predicate).id;
1015                 int o = ((ResourceImpl)object).id;
1016
1017                 provider = getProvider(provider);
1018                 if (null == provider) {
1019
1020                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1021                         clusterTable.writeOnlyInvalidate(cluster);
1022
1023                         try {
1024
1025                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1026
1027                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1028                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1029
1030                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1031                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1033                                 clusterTranslator.removeStatement(cluster);
1034
1035                                 queryProvider2.invalidateResource(s);
1036                                 clientChanges.invalidate(s);
1037
1038                         } catch (DatabaseException e) {
1039
1040                                 Logger.defaultLogError(e);
1041
1042                         }
1043
1044                         return true;
1045
1046                 } else {
1047                         
1048                         ((VirtualGraphImpl)provider).deny(s, p, o);
1049                         queryProvider2.invalidateResource(s);
1050                         clientChanges.invalidate(s);
1051
1052                         return true;
1053
1054                 }
1055
1056         }
1057
1058         @Override
1059         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1060             throw new UnsupportedOperationException();
1061         }
1062
1063         @Override
1064         public boolean writeOnly() {
1065             return true;
1066         }
1067
1068         @Override
1069         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1070             writeSupport.performWriteRequest(graph, request);
1071         }
1072
1073         @Override
1074         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1075             throw new UnsupportedOperationException();
1076         }
1077
1078         @Override
1079         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1080             throw new UnsupportedOperationException();
1081         }
1082
1083         @Override
1084         public <T> void addMetadata(Metadata data) throws ServiceException {
1085             writeSupport.addMetadata(data);
1086         }
1087
1088         @Override
1089         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1090             return writeSupport.getMetadata(clazz);
1091         }
1092
1093         @Override
1094         public TreeMap<String, byte[]> getMetadata() {
1095             return writeSupport.getMetadata();
1096         }
1097
1098         @Override
1099         public void commitDone(WriteTraits writeTraits, long csid) {
1100             writeSupport.commitDone(writeTraits, csid);
1101         }
1102
1103         @Override
1104         public void clearUndoList(WriteTraits writeTraits) {
1105             writeSupport.clearUndoList(writeTraits);
1106         }
1107         @Override
1108         public int clearMetadata() {
1109             return writeSupport.clearMetadata();
1110         }
1111
1112                 @Override
1113                 public void startUndo() {
1114                         writeSupport.startUndo();
1115                 }
1116     }
1117
1118     class VirtualWriteOnlySupport implements WriteSupport {
1119
1120 //        @Override
1121 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1122 //                Resource object) {
1123 //            throw new UnsupportedOperationException();
1124 //        }
1125
1126         @Override
1127         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1128
1129             TransientGraph impl = (TransientGraph)provider;
1130             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1131             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1132             clientChanges.claim(subject, predicate, object);
1133
1134         }
1135
1136         @Override
1137         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1138
1139             TransientGraph impl = (TransientGraph)provider;
1140             impl.claim(subject, predicate, object);
1141             getQueryProvider2().updateStatements(subject, predicate);
1142             clientChanges.claim(subject, predicate, object);
1143
1144         }
1145
1146         @Override
1147         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1148             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1149         }
1150
1151         @Override
1152         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1153             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1154             getQueryProvider2().updateValue(resource);
1155             clientChanges.claimValue(resource);
1156         }
1157
1158         @Override
1159         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1160             byte[] value = reader.readBytes(null, amount);
1161             claimValue(provider, resource, value);
1162         }
1163
1164         @Override
1165         public Resource createResource(VirtualGraph provider) {
1166             TransientGraph impl = (TransientGraph)provider;
1167             return impl.getResource(impl.newResource(false));
1168         }
1169
1170         @Override
1171         public Resource createResource(VirtualGraph provider, long clusterId) {
1172             throw new UnsupportedOperationException();
1173         }
1174
1175         @Override
1176         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1177         throws DatabaseException {
1178             throw new UnsupportedOperationException();
1179         }
1180
1181         @Override
1182         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1183         throws DatabaseException {
1184             throw new UnsupportedOperationException();
1185         }
1186
1187         @Override
1188         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1189         throws ServiceException {
1190             throw new UnsupportedOperationException();
1191         }
1192
1193         @Override
1194         public Resource setDefaultClusterSet(Resource clusterSet)
1195         throws ServiceException {
1196                 return null;
1197         }
1198
1199         @Override
1200         public void denyValue(VirtualGraph provider, Resource resource) {
1201             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1202             getQueryProvider2().updateValue(querySupport.getId(resource));
1203             // NOTE: this only keeps track of value changes by-resource.
1204             clientChanges.claimValue(resource);
1205         }
1206
1207         @Override
1208         public void flush(boolean intermediate) {
1209             throw new UnsupportedOperationException();
1210         }
1211
1212         @Override
1213         public void flushCluster() {
1214             throw new UnsupportedOperationException();
1215         }
1216
1217         @Override
1218         public void flushCluster(Resource r) {
1219             throw new UnsupportedOperationException("Resource " + r);
1220         }
1221
1222         @Override
1223         public void gc() {
1224         }
1225
1226         @Override
1227         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1228             TransientGraph impl = (TransientGraph) provider;
1229             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1230             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1231             clientChanges.deny(subject, predicate, object);
1232             return true;
1233         }
1234
1235         @Override
1236         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1237             throw new UnsupportedOperationException();
1238         }
1239
1240         @Override
1241         public boolean writeOnly() {
1242             return true;
1243         }
1244
1245         @Override
1246         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1247             throw new UnsupportedOperationException();
1248         }
1249
1250         @Override
1251         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1252             throw new UnsupportedOperationException();
1253         }
1254
1255         @Override
1256         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1257             throw new UnsupportedOperationException();
1258         }
1259
1260         @Override
1261         public <T> void addMetadata(Metadata data) throws ServiceException {
1262             throw new UnsupportedOperationException();
1263         }
1264
1265         @Override
1266         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1267             throw new UnsupportedOperationException();
1268         }
1269
1270         @Override
1271         public TreeMap<String, byte[]> getMetadata() {
1272             throw new UnsupportedOperationException();
1273         }
1274
1275         @Override
1276         public void commitDone(WriteTraits writeTraits, long csid) {
1277         }
1278
1279         @Override
1280         public void clearUndoList(WriteTraits writeTraits) {
1281         }
1282
1283         @Override
1284         public int clearMetadata() {
1285             return 0;
1286         }
1287
1288                 @Override
1289                 public void startUndo() {
1290                 }
1291     }
1292
1293     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1294
1295         try {
1296
1297             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1298
1299             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1300
1301             flushCounter = 0;
1302             Disposable.safeDispose(clientChanges);
1303             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1304
1305             acquireWriteOnly();
1306
1307             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1308
1309             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1310
1311             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1312             writeState = writeStateT;
1313
1314             assert (null != writer);
1315 //            writer.state.barrier.inc();
1316             long start = System.nanoTime();
1317             T result = request.perform(writer);
1318             long duration = System.nanoTime() - start;
1319             if (DEBUG) {
1320                 System.err.println("################");
1321                 System.err.println("WriteOnly duration " + 1e-9*duration);
1322             }
1323             writeStateT.setResult(result);
1324
1325             // This makes clusters available from server
1326             clusterStream.reallyFlush();
1327
1328             // This will trigger query updates
1329             releaseWriteOnly(writer);
1330             assert (null != writer);
1331
1332             handleUpdatesAndMetadata(writer);
1333
1334         } catch (CancelTransactionException e) {
1335
1336             releaseWriteOnly(writeState.getGraph());
1337
1338             clusterTable.removeWriteOnlyClusters();
1339             state.stopWriteTransaction(clusterStream);
1340
1341         } catch (Throwable e) {
1342
1343             e.printStackTrace();
1344
1345             releaseWriteOnly(writeState.getGraph());
1346
1347             clusterTable.removeWriteOnlyClusters();
1348
1349             if (callback != null)
1350                 callback.exception(new DatabaseException(e));
1351
1352             state.stopWriteTransaction(clusterStream);
1353             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1354
1355         } finally  {
1356
1357             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1358
1359         }
1360
1361     }
1362
1363     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1364         scheduleRequest(request, callback, notify, null);
1365     }
1366
1367     @Override
1368     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1369
1370         assertAlive();
1371
1372         assert (request != null);
1373
1374         requestManager.scheduleWrite(new SessionTask(null) {
1375
1376             @Override
1377             public void run(int thread) {
1378
1379                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1380
1381                     try {
1382
1383                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1384
1385                         flushCounter = 0;
1386                         Disposable.safeDispose(clientChanges);
1387                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1388
1389                         acquireWriteOnly();
1390
1391                         VirtualGraph vg = getProvider(request.getProvider());
1392                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1393
1394                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1395
1396                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1397
1398                             @Override
1399                             public void execute(Object result) {
1400                                 if(callback != null) callback.accept(null);
1401                             }
1402
1403                             @Override
1404                             public void exception(Throwable t) {
1405                                 if(callback != null) callback.accept((DatabaseException)t);
1406                             }
1407
1408                         });
1409
1410                         assert (null != writer);
1411 //                        writer.state.barrier.inc();
1412
1413                         try {
1414
1415                             request.perform(writer);
1416
1417                         } catch (Throwable e) {
1418
1419 //                            writer.state.barrier.dec();
1420 //                            writer.waitAsync(null);
1421
1422                             releaseWriteOnly(writer);
1423
1424                             clusterTable.removeWriteOnlyClusters();
1425
1426                             if(!(e instanceof CancelTransactionException)) {
1427                                 if (callback != null)
1428                                     callback.accept(new DatabaseException(e));
1429                             }
1430
1431                             writeState.except(e);
1432
1433                             return;
1434
1435                         }
1436
1437                         // This makes clusters available from server
1438                         boolean empty = clusterStream.reallyFlush();
1439                         // This was needed to make WO requests call metadata listeners.
1440                         // NOTE: the calling event does not contain clientChanges information.
1441                         if (!empty && clientChanges.isEmpty())
1442                             clientChanges.setNotEmpty(true);
1443                         releaseWriteOnly(writer);
1444                         assert (null != writer);
1445                     } finally  {
1446
1447                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1448
1449                     }
1450
1451
1452                 task.finish();
1453
1454             }
1455
1456         }, combine);
1457
1458     }
1459
1460     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1461         scheduleRequest(request, callback, notify, null);
1462     }
1463
1464     /* (non-Javadoc)
1465      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1466      */
1467     @Override
1468     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1469
1470         assert (request != null);
1471
1472         requestManager.scheduleWrite(new SessionTask(null) {
1473
1474             @Override
1475             public void run(int thread) {
1476
1477                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1478
1479                 performWriteOnly(request, notify, callback);
1480
1481                 task.finish();
1482
1483             }
1484
1485         }, combine);
1486
1487     }
1488
1489     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1490
1491         assert (request != null);
1492         assert (procedure != null);
1493
1494         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1495
1496         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1497
1498             @Override
1499             public void run(int thread) {
1500
1501                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1502
1503                 ListenerBase listener = getListenerBase(procedure);
1504
1505                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1506
1507                 try {
1508
1509                     if (listener != null) {
1510
1511                         try {
1512                                 
1513                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1514
1515                                         @Override
1516                                         public void exception(AsyncReadGraph graph, Throwable t) {
1517                                                 procedure.exception(graph, t);
1518                                                 if(throwable != null) {
1519                                                         throwable.set(t);
1520                                                 } else {
1521                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1522                                                 }
1523                                         }
1524
1525                                         @Override
1526                                         public void execute(AsyncReadGraph graph, T t) {
1527                                                 if(result != null) result.set(t);
1528                                                 procedure.execute(graph, t);
1529                                         }
1530
1531                                 };
1532                                 
1533                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1534                                 
1535                         } catch (Throwable t) {
1536                             // This is handled by the AsyncProcedure
1537                                 //Logger.defaultLogError("Internal error", t);
1538                         }
1539
1540                     } else {
1541
1542                         try {
1543
1544 //                            newGraph.state.barrier.inc();
1545
1546                             T t = request.perform(newGraph);
1547
1548                             try {
1549
1550                                 if(result != null) result.set(t);
1551                                 procedure.execute(newGraph, t);
1552
1553                             } catch (Throwable th) {
1554
1555                                 if(throwable != null) {
1556                                     throwable.set(th);
1557                                 } else {
1558                                     Logger.defaultLogError("Unhandled exception", th);
1559                                 }
1560
1561                             }
1562
1563                         } catch (Throwable t) {
1564
1565                             if (DEBUG)
1566                                 t.printStackTrace();
1567
1568                             if(throwable != null) {
1569                                 throwable.set(t);
1570                             } else {
1571                                 Logger.defaultLogError("Unhandled exception", t);
1572                             }
1573
1574                             try {
1575
1576                                 procedure.exception(newGraph, t);
1577
1578                             } catch (Throwable t2) {
1579
1580                                 if(throwable != null) {
1581                                     throwable.set(t2);
1582                                 } else {
1583                                     Logger.defaultLogError("Unhandled exception", t2);
1584                                 }
1585
1586                             }
1587
1588                         }
1589
1590 //                        newGraph.state.barrier.dec();
1591 //                        newGraph.waitAsync(request);
1592
1593                     }
1594
1595                 } finally {
1596
1597                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1598
1599                 }
1600
1601             }
1602
1603         });
1604
1605     }
1606
1607     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1608
1609         assert (request != null);
1610         assert (procedure != null);
1611
1612         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1613
1614         requestManager.scheduleRead(new SessionRead(null, notify) {
1615
1616             @Override
1617             public void run(int thread) {
1618
1619                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1620
1621                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1622
1623                 try {
1624
1625                     if (listener != null) {
1626
1627                         try {
1628                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1629                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1630                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1631                                                 } catch (DatabaseException e) {
1632                                                         Logger.defaultLogError(e);
1633                                                 }
1634
1635                     } else {
1636
1637 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1638 //                                procedure, "request");
1639
1640                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph.asyncBarrier, newGraph, procedure, request);
1641
1642                         try {
1643
1644                             request.perform(newGraph, wrap);
1645                             wrap.get();
1646
1647                         } catch (DatabaseException e) {
1648
1649                                                         Logger.defaultLogError(e);
1650
1651                         }
1652
1653                     }
1654
1655                 } finally {
1656
1657                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1658
1659                 }
1660
1661             }
1662
1663         });
1664
1665     }
1666
1667     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1668
1669         assert (request != null);
1670         assert (procedure != null);
1671
1672         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1673
1674         int sync = notify != null ? thread : -1;
1675
1676         requestManager.scheduleRead(new SessionRead(null, notify) {
1677
1678             @Override
1679             public void run(int thread) {
1680
1681                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1682
1683                 ListenerBase listener = getListenerBase(procedure);
1684
1685                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1686
1687                 try {
1688
1689                     if (listener != null) {
1690
1691                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1692
1693 //                        newGraph.waitAsync(request);
1694
1695                     } else {
1696
1697                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1698
1699                         try {
1700
1701                             request.perform(newGraph, wrapper);
1702
1703                         } catch (Throwable t) {
1704
1705                             t.printStackTrace();
1706
1707                         }
1708
1709                     }
1710
1711                 } finally {
1712
1713                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1714
1715                 }
1716
1717             }
1718
1719         });
1720
1721     }
1722     
1723     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1724
1725         assert (request != null);
1726         assert (procedure != null);
1727
1728         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1729
1730         int sync = notify != null ? thread : -1;
1731
1732         requestManager.scheduleRead(new SessionRead(null, notify) {
1733
1734             @Override
1735             public void run(int thread) {
1736
1737                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1738
1739                 ListenerBase listener = getListenerBase(procedure);
1740
1741                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1742
1743                 try {
1744
1745                     if (listener != null) {
1746
1747                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1748
1749 //                        newGraph.waitAsync(request);
1750
1751                     } else {
1752
1753                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1754
1755                         try {
1756
1757                             request.perform(newGraph, wrapper);
1758
1759                         } catch (Throwable t) {
1760
1761                             t.printStackTrace();
1762
1763                         }
1764
1765                     }
1766
1767                 } finally {
1768
1769                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1770
1771                 }
1772
1773             }
1774
1775         });
1776
1777     }
1778
1779     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1780
1781         assert (request != null);
1782         assert (procedure != null);
1783
1784         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1785
1786         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1787
1788             @Override
1789             public void run(int thread) {
1790
1791                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1792
1793                 ListenerBase listener = getListenerBase(procedure);
1794
1795                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1796
1797                 try {
1798
1799                     if (listener != null) {
1800
1801                         try {
1802                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1803                         } catch (DatabaseException e) {
1804                             Logger.defaultLogError(e);
1805                         }
1806
1807                     } else {
1808
1809 //                        newGraph.state.barrier.inc();
1810
1811                         request.register(newGraph, new Listener<T>() {
1812
1813                             @Override
1814                             public void exception(Throwable t) {
1815                                 if(throwable != null) throwable.set(t);
1816                                 procedure.exception(t);
1817 //                                newGraph.state.barrier.dec();
1818                             }
1819
1820                             @Override
1821                             public void execute(T t) {
1822                                 if(result != null) result.set(t);
1823                                 procedure.execute(t);
1824 //                                newGraph.state.barrier.dec();
1825                             }
1826
1827                             @Override
1828                             public boolean isDisposed() {
1829                                 return true;
1830                             }
1831
1832                         });
1833
1834 //                        newGraph.waitAsync(request);
1835
1836                     }
1837
1838                 } finally {
1839
1840                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1841
1842                 }
1843
1844             }
1845
1846         });
1847
1848     }
1849
1850
1851     @Override
1852     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1853
1854         scheduleRequest(request, procedure, null, null, null);
1855
1856     }
1857
1858     @Override
1859     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1860         assertNotSession();
1861         Semaphore notify = new Semaphore(0);
1862         DataContainer<Throwable> container = new DataContainer<Throwable>();
1863         DataContainer<T> result = new DataContainer<T>();
1864         scheduleRequest(request, procedure, notify, container, result);
1865         acquire(notify, request);
1866         Throwable throwable = container.get();
1867         if(throwable != null) {
1868             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1869             else throw new DatabaseException("Unexpected exception", throwable);
1870         }
1871         return result.get();
1872     }
1873
1874     @Override
1875     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1876         assertNotSession();
1877         Semaphore notify = new Semaphore(0);
1878         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1879         final DataContainer<T> resultContainer = new DataContainer<T>();
1880         scheduleRequest(request, new AsyncProcedure<T>() {
1881             @Override
1882             public void exception(AsyncReadGraph graph, Throwable throwable) {
1883                 exceptionContainer.set(throwable);
1884                 procedure.exception(graph, throwable);
1885             }
1886             @Override
1887             public void execute(AsyncReadGraph graph, T result) {
1888                 resultContainer.set(result);
1889                 procedure.execute(graph, result);
1890             }
1891         }, getListenerBase(procedure), notify);
1892         acquire(notify, request, procedure);
1893         Throwable throwable = exceptionContainer.get();
1894         if (throwable != null) {
1895             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1896             else throw new DatabaseException("Unexpected exception", throwable);
1897         }
1898         return resultContainer.get();
1899     }
1900
1901
1902     @Override
1903     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1904         assertNotSession();
1905         Semaphore notify = new Semaphore(0);
1906         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1907         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1908             @Override
1909             public void exception(AsyncReadGraph graph, Throwable throwable) {
1910                 exceptionContainer.set(throwable);
1911                 procedure.exception(graph, throwable);
1912             }
1913             @Override
1914             public void execute(AsyncReadGraph graph, T result) {
1915                 procedure.execute(graph, result);
1916             }
1917             @Override
1918             public void finished(AsyncReadGraph graph) {
1919                 procedure.finished(graph);
1920             }
1921         }, notify);
1922         acquire(notify, request, procedure);
1923         Throwable throwable = exceptionContainer.get();
1924         if (throwable != null) {
1925             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1926             else throw new DatabaseException("Unexpected exception", throwable);
1927         }
1928         // TODO: implement return value
1929         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1930         return null;
1931     }
1932
1933     @Override
1934     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1935         return syncRequest(request, new ProcedureAdapter<T>());
1936
1937
1938 //        assert(request != null);
1939 //
1940 //        final DataContainer<T> result = new DataContainer<T>();
1941 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1942 //
1943 //        syncRequest(request, new Procedure<T>() {
1944 //
1945 //            @Override
1946 //            public void execute(T t) {
1947 //                result.set(t);
1948 //            }
1949 //
1950 //            @Override
1951 //            public void exception(Throwable t) {
1952 //                exception.set(t);
1953 //            }
1954 //
1955 //        });
1956 //
1957 //        Throwable t = exception.get();
1958 //        if(t != null) {
1959 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1960 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1961 //        }
1962 //
1963 //        return result.get();
1964
1965     }
1966
1967     @Override
1968     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1969         return syncRequest(request, (Procedure<T>)procedure);
1970     }
1971
1972
1973 //    @Override
1974 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1975 //        assertNotSession();
1976 //        Semaphore notify = new Semaphore(0);
1977 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1978 //        DataContainer<T> result = new DataContainer<T>();
1979 //        scheduleRequest(request, procedure, notify, container, result);
1980 //        acquire(notify, request);
1981 //        Throwable throwable = container.get();
1982 //        if(throwable != null) {
1983 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1984 //            else throw new DatabaseException("Unexpected exception", throwable);
1985 //        }
1986 //        return result.get();
1987 //    }
1988
1989
1990     @Override
1991     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1992         assertNotSession();
1993         Semaphore notify = new Semaphore(0);
1994         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1995         final DataContainer<T> result = new DataContainer<T>();
1996         scheduleRequest(request, procedure, notify, container, result);
1997         acquire(notify, request);
1998         Throwable throwable = container.get();
1999         if (throwable != null) {
2000             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
2001             else throw new DatabaseException("Unexpected exception", throwable);
2002         }
2003         return result.get();
2004     }
2005
2006     @Override
2007     public void syncRequest(Write request) throws DatabaseException  {
2008         assertNotSession();
2009         assertAlive();
2010         Semaphore notify = new Semaphore(0);
2011         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2012         scheduleRequest(request, e -> exception.set(e), notify);
2013         acquire(notify, request);
2014         if(exception.get() != null) throw exception.get();
2015     }
2016
2017     @Override
2018     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2019         assertNotSession();
2020         Semaphore notify = new Semaphore(0);
2021         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2022         final DataContainer<T> result = new DataContainer<T>();
2023         scheduleRequest(request, new Procedure<T>() {
2024
2025             @Override
2026             public void exception(Throwable t) {
2027               exception.set(t);
2028             }
2029
2030             @Override
2031             public void execute(T t) {
2032               result.set(t);
2033             }
2034
2035         }, notify);
2036         acquire(notify, request);
2037         if(exception.get() != null) {
2038             Throwable t = exception.get();
2039             if(t instanceof DatabaseException) throw (DatabaseException)t;
2040             else throw new DatabaseException(t);
2041         }
2042         return result.get();
2043     }
2044
2045     @Override
2046     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2047         assertNotSession();
2048         Semaphore notify = new Semaphore(0);
2049         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2050         final DataContainer<T> result = new DataContainer<T>();
2051         scheduleRequest(request, new Procedure<T>() {
2052
2053             @Override
2054             public void exception(Throwable t) {
2055               exception.set(t);
2056             }
2057
2058             @Override
2059             public void execute(T t) {
2060               result.set(t);
2061             }
2062
2063         }, notify);
2064         acquire(notify, request);
2065         if(exception.get() != null) {
2066             Throwable t = exception.get();
2067             if(t instanceof DatabaseException) throw (DatabaseException)t;
2068             else throw new DatabaseException(t);
2069         }
2070         return result.get();
2071     }
2072
2073     @Override
2074     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2075         assertNotSession();
2076         Semaphore notify = new Semaphore(0);
2077         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2078         final DataContainer<T> result = new DataContainer<T>();
2079         scheduleRequest(request, new Procedure<T>() {
2080
2081             @Override
2082             public void exception(Throwable t) {
2083               exception.set(t);
2084             }
2085
2086             @Override
2087             public void execute(T t) {
2088               result.set(t);
2089             }
2090
2091         }, notify);
2092         acquire(notify, request);
2093         if(exception.get() != null) {
2094             Throwable t = exception.get();
2095             if(t instanceof DatabaseException) throw (DatabaseException)t;
2096             else throw new DatabaseException(t);
2097         }
2098         return result.get();
2099     }
2100
2101     @Override
2102     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2103         assertNotSession();
2104         Semaphore notify = new Semaphore(0);
2105         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2106         scheduleRequest(request, e -> exception.set(e), notify);
2107         acquire(notify, request);
2108         if(exception.get() != null) throw exception.get();
2109     }
2110
2111     @Override
2112     public void syncRequest(WriteOnly request) throws DatabaseException  {
2113         assertNotSession();
2114         assertAlive();
2115         Semaphore notify = new Semaphore(0);
2116         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2117         scheduleRequest(request, e -> exception.set(e), notify);
2118         acquire(notify, request);
2119         if(exception.get() != null) throw exception.get();
2120     }
2121
2122     /*
2123      *
2124      * GraphRequestProcessor interface
2125      */
2126
2127     @Override
2128     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2129
2130         scheduleRequest(request, procedure, null, null);
2131
2132     }
2133
2134     @Override
2135     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2136
2137         scheduleRequest(request, procedure, null);
2138
2139     }
2140
2141     @Override
2142     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2143
2144         scheduleRequest(request, procedure, null, null, null);
2145
2146     }
2147
2148     @Override
2149     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2150
2151         scheduleRequest(request, callback, null);
2152
2153     }
2154
2155     @Override
2156     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2157
2158         scheduleRequest(request, procedure, null);
2159
2160     }
2161
2162     @Override
2163     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2164
2165         scheduleRequest(request, procedure, null);
2166
2167     }
2168
2169     @Override
2170     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2171
2172         scheduleRequest(request, procedure, null);
2173
2174     }
2175
2176     @Override
2177     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2178
2179         scheduleRequest(request, callback, null);
2180
2181     }
2182
2183     @Override
2184     public void asyncRequest(final Write r) {
2185         asyncRequest(r, null);
2186     }
2187
2188     @Override
2189     public void asyncRequest(final DelayedWrite r) {
2190         asyncRequest(r, null);
2191     }
2192
2193     @Override
2194     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2195
2196         scheduleRequest(request, callback, null);
2197
2198     }
2199
2200     @Override
2201     public void asyncRequest(final WriteOnly request) {
2202
2203         asyncRequest(request, null);
2204
2205     }
2206
2207     @Override
2208     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2209         r.request(this, procedure);
2210     }
2211
2212     @Override
2213     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2214         r.request(this, procedure);
2215     }
2216
2217     @Override
2218     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2219         r.request(this, procedure);
2220     }
2221
2222     @Override
2223     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2224         r.request(this, procedure);
2225     }
2226
2227     @Override
2228     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2229         r.request(this, procedure);
2230     }
2231
2232     @Override
2233     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2234         r.request(this, procedure);
2235     }
2236
2237     @Override
2238     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2239         return r.request(this);
2240     }
2241
2242     @Override
2243     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2244         return r.request(this);
2245     }
2246
2247     @Override
2248     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2249         r.request(this, procedure);
2250     }
2251
2252     @Override
2253     public <T> void async(WriteInterface<T> r) {
2254         r.request(this, new ProcedureAdapter<T>());
2255     }
2256
2257     //@Override
2258     public void incAsync() {
2259         state.incAsync();
2260     }
2261
2262     //@Override
2263     public void decAsync() {
2264         state.decAsync();
2265     }
2266
2267     public long getCluster(ResourceImpl resource) {
2268         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2269         return cluster.getClusterId();
2270     }
2271
2272     public long getCluster(int id) {
2273         if (clusterTable == null)
2274             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2275         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2276     }
2277
2278     public ResourceImpl getResource(int id) {
2279         return new ResourceImpl(resourceSupport, id);
2280     }
2281
2282     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2283         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2284         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2285         int key = proxy.getClusterKey();
2286         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2287         return new ResourceImpl(resourceSupport, resourceKey);
2288     }
2289
2290     public ResourceImpl getResource2(int id) {
2291         assert (id != 0);
2292         return new ResourceImpl(resourceSupport, id);
2293     }
2294
2295     final public int getId(ResourceImpl impl) {
2296         return impl.id;
2297     }
2298
2299     public static final Charset UTF8 = Charset.forName("utf-8");
2300
2301
2302
2303
2304     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2305         for(TransientGraph g : support.providers) {
2306             if(g.isPending(subject)) return false;
2307         }
2308         return true;
2309     }
2310
2311     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2312         for(TransientGraph g : support.providers) {
2313             if(g.isPending(subject, predicate)) return false;
2314         }
2315         return true;
2316     }
2317
2318     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2319
2320         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2321
2322             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2323
2324             @Override
2325             public void accept(ReadGraphImpl graph) {
2326                 if(ready.decrementAndGet() == 0) {
2327                     runnable.accept(graph);
2328                 }
2329             }
2330
2331         };
2332
2333         for(TransientGraph g : support.providers) {
2334             if(g.isPending(subject)) {
2335                 try {
2336                     g.load(graph, subject, composite);
2337                 } catch (DatabaseException e) {
2338                     e.printStackTrace();
2339                 }
2340             } else {
2341                 composite.accept(graph);
2342             }
2343         }
2344
2345         composite.accept(graph);
2346
2347     }
2348
2349     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2350
2351         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2352
2353             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2354
2355             @Override
2356             public void accept(ReadGraphImpl graph) {
2357                 if(ready.decrementAndGet() == 0) {
2358                     runnable.accept(graph);
2359                 }
2360             }
2361
2362         };
2363
2364         for(TransientGraph g : support.providers) {
2365             if(g.isPending(subject, predicate)) {
2366                 try {
2367                     g.load(graph, subject, predicate, composite);
2368                 } catch (DatabaseException e) {
2369                     e.printStackTrace();
2370                 }
2371             } else {
2372                 composite.accept(graph);
2373             }
2374         }
2375
2376         composite.accept(graph);
2377
2378     }
2379
2380 //    void dumpHeap() {
2381 //
2382 //        try {
2383 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2384 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2385 //                    "d:/heap" + flushCounter + ".txt", true);
2386 //        } catch (IOException e) {
2387 //            e.printStackTrace();
2388 //        }
2389 //
2390 //    }
2391
2392     void fireReactionsToSynchronize(ChangeSet cs) {
2393
2394         // Do not fire empty events
2395         if (cs.isEmpty())
2396             return;
2397
2398         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2399 //        g.state.barrier.inc();
2400
2401         try {
2402
2403             if (!cs.isEmpty()) {
2404                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2405                 for (ChangeListener l : changeListeners2) {
2406                     try {
2407                         l.graphChanged(e2);
2408                     } catch (Exception ex) {
2409                         ex.printStackTrace();
2410                     }
2411                 }
2412             }
2413
2414         } finally {
2415
2416 //            g.state.barrier.dec();
2417 //            g.waitAsync(null);
2418
2419         }
2420
2421     }
2422
2423     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2424         try {
2425
2426             // Do not fire empty events
2427             if (cs2.isEmpty())
2428                 return;
2429
2430 //            graph.restart();
2431 //            graph.state.barrier.inc();
2432
2433             try {
2434
2435                 if (!cs2.isEmpty()) {
2436
2437                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2438                     for (ChangeListener l : changeListeners2) {
2439                         try {
2440                             l.graphChanged(e2);
2441 //                            System.out.println("changelistener " + l);
2442                         } catch (Exception ex) {
2443                             ex.printStackTrace();
2444                         }
2445                     }
2446
2447                 }
2448
2449             } finally {
2450
2451 //                graph.state.barrier.dec();
2452 //                graph.waitAsync(null);
2453
2454             }
2455         } catch (Throwable t) {
2456             t.printStackTrace();
2457
2458         }
2459     }
2460     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2461
2462         try {
2463
2464             // Do not fire empty events
2465             if (cs2.isEmpty())
2466                 return;
2467
2468             // Do not fire on virtual requests
2469             if(graph.getProvider() != null)
2470                 return;
2471
2472             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2473
2474             try {
2475
2476                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2477                 for (ChangeListener l : metadataListeners) {
2478                     try {
2479                         l.graphChanged(e2);
2480                     } catch (Throwable ex) {
2481                         ex.printStackTrace();
2482                     }
2483                 }
2484
2485             } finally {
2486
2487             }
2488
2489         } catch (Throwable t) {
2490             t.printStackTrace();
2491         }
2492
2493     }
2494
2495
2496     /*
2497      * (non-Javadoc)
2498      *
2499      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2500      */
2501     @Override
2502     public <T> T getService(Class<T> api) {
2503         T t = peekService(api);
2504         if (t == null) {
2505             if (state.isClosed())
2506                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2507             throw new ServiceNotFoundException(this, api);
2508         }
2509         return t;
2510     }
2511
2512     /**
2513      * @return
2514      */
2515     protected abstract ServerInformation getCachedServerInformation();
2516
2517         private Class<?> serviceKey1 = null;
2518         private Class<?> serviceKey2 = null;
2519         private Object service1 = null;
2520         private Object service2 = null;
2521
2522     /*
2523      * (non-Javadoc)
2524      *
2525      * @see
2526      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2527      */
2528     @SuppressWarnings("unchecked")
2529     @Override
2530     public synchronized <T> T peekService(Class<T> api) {
2531
2532                 if(serviceKey1 == api) {
2533                         return (T)service1;
2534                 } else if (serviceKey2 == api) {
2535                         // Promote this key
2536                         Object result = service2;
2537                         service2 = service1;
2538                         serviceKey2 = serviceKey1;
2539                         service1 = result;
2540                         serviceKey1 = api;
2541                         return (T)result;
2542                 }
2543
2544         if (Layer0.class == api)
2545                 return (T) L0;
2546         if (ServerInformation.class == api)
2547             return (T) getCachedServerInformation();
2548         else if (WriteGraphImpl.class == api)
2549             return (T) writeState.getGraph();
2550         else if (ClusterBuilder.class == api)
2551             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2552         else if (ClusterBuilderFactory.class == api)
2553             return (T)new ClusterBuilderFactoryImpl(this);
2554
2555                 service2 = service1;
2556                 serviceKey2 = serviceKey1;
2557
2558         service1 = serviceLocator.peekService(api);
2559         serviceKey1 = api;
2560
2561         return (T)service1;
2562
2563     }
2564
2565     /*
2566      * (non-Javadoc)
2567      *
2568      * @see
2569      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2570      */
2571     @Override
2572     public boolean hasService(Class<?> api) {
2573         return serviceLocator.hasService(api);
2574     }
2575
2576     /**
2577      * @param api the api that must be implemented by the specified service
2578      * @param service the service implementation
2579      */
2580     @Override
2581     public <T> void registerService(Class<T> api, T service) {
2582         if(Layer0.class == api) {
2583                 L0 = (Layer0)service;
2584                 return;
2585         }
2586         serviceLocator.registerService(api, service);
2587         if (TransactionPolicySupport.class == api) {
2588             transactionPolicy = (TransactionPolicySupport)service;
2589             state.resetTransactionPolicy();
2590         }
2591         if (api == serviceKey1)
2592                 service1 = service;
2593         else if (api == serviceKey2)
2594                 service2 = service;
2595     }
2596
2597     // ----------------
2598     // SessionMonitor
2599     // ----------------
2600
2601     void fireSessionVariableChange(String variable) {
2602         for (MonitorHandler h : monitorHandlers) {
2603             MonitorContext ctx = monitorContexts.getLeft(h);
2604             assert ctx != null;
2605             // SafeRunner functionality repeated here to avoid dependency.
2606             try {
2607                 h.valuesChanged(ctx);
2608             } catch (Exception e) {
2609                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2610             } catch (LinkageError e) {
2611                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2612             }
2613         }
2614     }
2615
2616
2617     class ResourceSerializerImpl implements ResourceSerializer {
2618
2619         public long createRandomAccessId(int id) throws DatabaseException {
2620             if(id < 0)
2621                 return id;
2622             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2623             long cluster = getCluster(id);
2624             if (0 == cluster)
2625                 return 0; // Better to return 0 then invalid id.
2626             long result = ClusterTraitsBase.createResourceId(cluster, index);
2627             return result;
2628         }
2629
2630         @Override
2631         public long getRandomAccessId(Resource resource) throws DatabaseException {
2632
2633             ResourceImpl resourceImpl = (ResourceImpl) resource;
2634             return createRandomAccessId(resourceImpl.id);
2635
2636         }
2637
2638         @Override
2639         public int getTransientId(Resource resource) throws DatabaseException {
2640
2641             ResourceImpl resourceImpl = (ResourceImpl) resource;
2642             return resourceImpl.id;
2643
2644         }
2645
2646         @Override
2647         public String createRandomAccessId(Resource resource)
2648         throws InvalidResourceReferenceException {
2649
2650             if(resource == null) throw new IllegalArgumentException();
2651
2652             ResourceImpl resourceImpl = (ResourceImpl) resource;
2653
2654             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2655
2656             int r;
2657             try {
2658                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2659             } catch (DatabaseException e1) {
2660                 throw new InvalidResourceReferenceException(e1);
2661             }
2662             try {
2663                 // Serialize as '<resource index>_<cluster id>'
2664                 return "" + r + "_" + getCluster(resourceImpl);
2665             } catch (Throwable e) {
2666                 e.printStackTrace();
2667                 throw new InvalidResourceReferenceException(e);
2668             } finally {
2669             }
2670         }
2671
2672         public int getTransientId(long serialized) throws DatabaseException {
2673             if (serialized <= 0)
2674                 return (int)serialized;
2675             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2676             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2677             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2678             if (cluster == null)
2679                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2680             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2681             return key;
2682         }
2683
2684         @Override
2685         public Resource getResource(long randomAccessId) throws DatabaseException {
2686             return getResourceByKey(getTransientId(randomAccessId));
2687         }
2688
2689         @Override
2690         public Resource getResource(int transientId) throws DatabaseException {
2691             return getResourceByKey(transientId);
2692         }
2693
2694         @Override
2695         public Resource getResource(String randomAccessId)
2696         throws InvalidResourceReferenceException {
2697             try {
2698                 int i = randomAccessId.indexOf('_');
2699                 if (i == -1)
2700                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2701                             + randomAccessId + "'");
2702                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2703                 if(r < 0) return getResourceByKey(r);
2704                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2705                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2706                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2707                 if (cluster.hasResource(key, clusterTranslator))
2708                     return getResourceByKey(key);
2709             } catch (InvalidResourceReferenceException e) {
2710                 throw e;
2711             } catch (NumberFormatException e) {
2712                 throw new InvalidResourceReferenceException(e);
2713             } catch (Throwable e) {
2714                 e.printStackTrace();
2715                 throw new InvalidResourceReferenceException(e);
2716             } finally {
2717             }
2718             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2719         }
2720
2721         @Override
2722         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2723             try {
2724                 return true;
2725             } catch (Throwable e) {
2726                 e.printStackTrace();
2727                 throw new InvalidResourceReferenceException(e);
2728             } finally {
2729             }
2730         }
2731     }
2732
2733     // Copied from old SessionImpl
2734
2735     void check() {
2736         if (state.isClosed())
2737             throw new Error("Session closed.");
2738     }
2739
2740
2741
2742     public ResourceImpl getNewResource(long clusterId) {
2743         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2744         int newId;
2745         try {
2746             newId = cluster.createResource(clusterTranslator);
2747         } catch (DatabaseException e) {
2748             Logger.defaultLogError(e);
2749             return null;
2750         }
2751         return new ResourceImpl(resourceSupport, newId);
2752     }
2753
2754     public ResourceImpl getNewResource(Resource clusterSet)
2755     throws DatabaseException {
2756         long resourceId = clusterSet.getResourceId();
2757         Long clusterId = clusterSetsSupport.get(resourceId);
2758         if (null == clusterId)
2759             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2760         if (Constants.NewClusterId == clusterId) {
2761             clusterId = getService(ClusteringSupport.class).createCluster();
2762             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2763                 createdClusters.add(clusterId);
2764             clusterSetsSupport.put(resourceId, clusterId);
2765             return getNewResource(clusterId);
2766         } else {
2767             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2768             ResourceImpl result;
2769             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2770                 clusterId = getService(ClusteringSupport.class).createCluster();
2771                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2772                     createdClusters.add(clusterId);
2773                 clusterSetsSupport.put(resourceId, clusterId);
2774                 return getNewResource(clusterId);
2775             } else {
2776                 result = getNewResource(clusterId);
2777                 int resultKey = querySupport.getId(result);
2778                 long resultCluster = querySupport.getClusterId(resultKey);
2779                 if (clusterId != resultCluster)
2780                     clusterSetsSupport.put(resourceId, resultCluster);
2781                 return result;
2782             }
2783         }
2784     }
2785
2786     public void getNewClusterSet(Resource clusterSet)
2787     throws DatabaseException {
2788         if (DEBUG)
2789             System.out.println("new cluster set=" + clusterSet);
2790         long resourceId = clusterSet.getResourceId();
2791         if (clusterSetsSupport.containsKey(resourceId))
2792             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2793         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2794     }
2795     public boolean containsClusterSet(Resource clusterSet)
2796     throws ServiceException {
2797         long resourceId = clusterSet.getResourceId();
2798         return clusterSetsSupport.containsKey(resourceId);
2799     }
2800     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2801         Resource r = defaultClusterSet;
2802         defaultClusterSet = clusterSet;
2803         return r;
2804     }
2805     void printDiagnostics() {
2806
2807         if (DIAGNOSTICS) {
2808
2809             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2810             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2811             for(ClusterI cluster : clusterTable.getClusters()) {
2812                 try {
2813                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2814                         residentClusters.set(residentClusters.get() + 1);
2815                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2816                     }
2817                 } catch (DatabaseException e) {
2818                     Logger.defaultLogError(e);
2819                 }
2820             }
2821
2822             System.out.println("--------------------------------");
2823             System.out.println("Cluster information:");
2824             System.out.println("-amount of resident clusters=" + residentClusters.get());
2825             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2826
2827             for(ClusterI cluster : clusterTable.getClusters()) {
2828                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2829                 try {
2830                     if (!cluster.isLoaded())
2831                         System.out.println("not loaded]");
2832                     else if (cluster.isEmpty())
2833                         System.out.println("is empty]");
2834                     else {
2835                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2836                         System.out.print(",references=" + cluster.getReferenceCount());
2837                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2838                     }
2839                 } catch (DatabaseException e) {
2840                     Logger.defaultLogError(e);
2841                     System.out.println("is corrupted]");
2842                 }
2843             }
2844
2845             queryProvider2.printDiagnostics();
2846             System.out.println("--------------------------------");
2847         }
2848
2849     }
2850
2851     public void fireStartReadTransaction() {
2852
2853         if (DIAGNOSTICS)
2854             System.out.println("StartReadTransaction");
2855
2856         for (SessionEventListener listener : eventListeners) {
2857             try {
2858                 listener.readTransactionStarted();
2859             } catch (Throwable t) {
2860                 t.printStackTrace();
2861             }
2862         }
2863
2864     }
2865
2866     public void fireFinishReadTransaction() {
2867
2868         if (DIAGNOSTICS)
2869             System.out.println("FinishReadTransaction");
2870
2871         for (SessionEventListener listener : eventListeners) {
2872             try {
2873                 listener.readTransactionFinished();
2874             } catch (Throwable t) {
2875                 t.printStackTrace();
2876             }
2877         }
2878
2879     }
2880
2881     public void fireStartWriteTransaction() {
2882
2883         if (DIAGNOSTICS)
2884             System.out.println("StartWriteTransaction");
2885
2886         for (SessionEventListener listener : eventListeners) {
2887             try {
2888                 listener.writeTransactionStarted();
2889             } catch (Throwable t) {
2890                 t.printStackTrace();
2891             }
2892         }
2893
2894     }
2895
2896     public void fireFinishWriteTransaction() {
2897
2898         if (DIAGNOSTICS)
2899             System.out.println("FinishWriteTransaction");
2900
2901         for (SessionEventListener listener : eventListeners) {
2902             try {
2903                 listener.writeTransactionFinished();
2904             } catch (Throwable t) {
2905                 t.printStackTrace();
2906             }
2907         }
2908
2909         Indexing.resetDependenciesIndexingDisabled();
2910
2911     }
2912
2913     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2914         throw new Error("Not supported at the moment.");
2915     }
2916
2917     State getState() {
2918         return state;
2919     }
2920
2921     ClusterTable getClusterTable() {
2922         return clusterTable;
2923     }
2924
2925     public GraphSession getGraphSession() {
2926         return graphSession;
2927     }
2928
2929     public QueryProcessor getQueryProvider2() {
2930         return queryProvider2;
2931     }
2932
2933     ClientChangesImpl getClientChanges() {
2934         return clientChanges;
2935     }
2936
2937     boolean getWriteOnly() {
2938         return writeOnly;
2939     }
2940
2941     static int counter = 0;
2942
2943     public void onClusterLoaded(long clusterId) {
2944
2945         clusterTable.updateSize();
2946
2947     }
2948
2949
2950
2951
2952     /*
2953      * Implementation of the interface RequestProcessor
2954      */
2955
2956     @Override
2957     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2958
2959         assertNotSession();
2960         assertAlive();
2961
2962         assert(request != null);
2963
2964         final DataContainer<T> result = new DataContainer<T>();
2965         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2966
2967         syncRequest(request, new AsyncProcedure<T>() {
2968
2969             @Override
2970             public void execute(AsyncReadGraph graph, T t) {
2971                 result.set(t);
2972             }
2973
2974             @Override
2975             public void exception(AsyncReadGraph graph, Throwable t) {
2976                 exception.set(t);
2977             }
2978
2979         });
2980
2981         Throwable t = exception.get();
2982         if(t != null) {
2983             if(t instanceof DatabaseException) throw (DatabaseException)t;
2984             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2985         }
2986
2987         return result.get();
2988
2989     }
2990
2991 //    @Override
2992 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2993 //        assertNotSession();
2994 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2995 //    }
2996
2997     @Override
2998     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2999         assertNotSession();
3000         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3001     }
3002
3003     @Override
3004     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
3005         assertNotSession();
3006         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3007     }
3008
3009     @Override
3010     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3011         assertNotSession();
3012         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3013     }
3014
3015     @Override
3016     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3017         assertNotSession();
3018         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3019     }
3020
3021     @Override
3022     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3023
3024         assertNotSession();
3025
3026         assert(request != null);
3027
3028         final DataContainer<T> result = new DataContainer<T>();
3029         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3030
3031         syncRequest(request, new AsyncProcedure<T>() {
3032
3033             @Override
3034             public void execute(AsyncReadGraph graph, T t) {
3035                 result.set(t);
3036             }
3037
3038             @Override
3039             public void exception(AsyncReadGraph graph, Throwable t) {
3040                 exception.set(t);
3041             }
3042
3043         });
3044
3045         Throwable t = exception.get();
3046         if(t != null) {
3047             if(t instanceof DatabaseException) throw (DatabaseException)t;
3048             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3049         }
3050
3051         return result.get();
3052
3053     }
3054
3055     @Override
3056     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3057         assertNotSession();
3058         return syncRequest(request, (AsyncProcedure<T>)procedure);
3059     }
3060
3061     @Override
3062     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3063         assertNotSession();
3064         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3065     }
3066
3067     @Override
3068     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3069         assertNotSession();
3070         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3071     }
3072
3073     @Override
3074     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3075         assertNotSession();
3076         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3077     }
3078
3079     @Override
3080     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3081         assertNotSession();
3082         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3083     }
3084
3085     @Override
3086     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3087
3088         assertNotSession();
3089
3090         assert(request != null);
3091
3092         final ArrayList<T> result = new ArrayList<T>();
3093         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3094
3095         syncRequest(request, new SyncMultiProcedure<T>() {
3096
3097             @Override
3098             public void execute(ReadGraph graph, T t) {
3099                 synchronized(result) {
3100                     result.add(t);
3101                 }
3102             }
3103
3104             @Override
3105             public void finished(ReadGraph graph) {
3106             }
3107
3108             @Override
3109             public void exception(ReadGraph graph, Throwable t) {
3110                 exception.set(t);
3111             }
3112
3113
3114         });
3115
3116         Throwable t = exception.get();
3117         if(t != null) {
3118             if(t instanceof DatabaseException) throw (DatabaseException)t;
3119             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3120         }
3121
3122         return result;
3123
3124     }
3125
3126     @Override
3127     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3128         assertNotSession();
3129         throw new Error("Not implemented!");
3130     }
3131
3132     @Override
3133     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3134         assertNotSession();
3135         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3136     }
3137
3138     @Override
3139     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3140         assertNotSession();
3141         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3142     }
3143
3144     @Override
3145     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3146         assertNotSession();
3147         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3148     }
3149
3150     @Override
3151     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3152
3153         assertNotSession();
3154
3155         assert(request != null);
3156
3157         final ArrayList<T> result = new ArrayList<T>();
3158         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3159
3160         syncRequest(request, new AsyncMultiProcedure<T>() {
3161
3162             @Override
3163             public void execute(AsyncReadGraph graph, T t) {
3164                 synchronized(result) {
3165                     result.add(t);
3166                 }
3167             }
3168
3169             @Override
3170             public void finished(AsyncReadGraph graph) {
3171             }
3172
3173             @Override
3174             public void exception(AsyncReadGraph graph, Throwable t) {
3175                 exception.set(t);
3176             }
3177
3178
3179         });
3180
3181         Throwable t = exception.get();
3182         if (t != null) {
3183             if (t instanceof DatabaseException)
3184                 throw (DatabaseException) t;
3185             else
3186                 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3187         }
3188
3189         return result;
3190
3191     }
3192
3193     @Override
3194     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3195         assertNotSession();
3196         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3197     }
3198
3199     @Override
3200     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3201         assertNotSession();
3202         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3203     }
3204
3205     @Override
3206     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3207         assertNotSession();
3208        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3209     }
3210
3211     @Override
3212     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3213         assertNotSession();
3214         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3215     }
3216
3217     @Override
3218     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3219         assertNotSession();
3220         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3221     }
3222
3223
3224     @Override
3225     public <T> void asyncRequest(Read<T> request) {
3226
3227         asyncRequest(request, new ProcedureAdapter<T>() {
3228             @Override
3229             public void exception(Throwable t) {
3230                 t.printStackTrace();
3231             }
3232         });
3233
3234     }
3235
3236     @Override
3237     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3238         asyncRequest(request, (AsyncProcedure<T>)procedure);
3239     }
3240
3241     @Override
3242     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3243         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3244     }
3245
3246     @Override
3247     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3248         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3249     }
3250
3251     @Override
3252     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3253         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3254     }
3255
3256     @Override
3257     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3258         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3259     }
3260
3261     @Override
3262     final public <T> void asyncRequest(final AsyncRead<T> request) {
3263
3264         assert(request != null);
3265
3266         asyncRequest(request, new ProcedureAdapter<T>() {
3267             @Override
3268             public void exception(Throwable t) {
3269                 t.printStackTrace();
3270             }
3271         });
3272
3273     }
3274
3275     @Override
3276     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3277         scheduleRequest(request, procedure, procedure, null);
3278     }
3279
3280     @Override
3281     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3282         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3283     }
3284
3285     @Override
3286     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3287         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3288     }
3289
3290     @Override
3291     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3292         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3293     }
3294
3295     @Override
3296     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3297         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3298     }
3299
3300     @Override
3301     public <T> void asyncRequest(MultiRead<T> request) {
3302
3303         assert(request != null);
3304
3305         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3306             @Override
3307             public void exception(ReadGraph graph, Throwable t) {
3308                 t.printStackTrace();
3309             }
3310         });
3311
3312     }
3313
3314     @Override
3315     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3316         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3317     }
3318
3319     @Override
3320     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3321         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3322     }
3323
3324     @Override
3325     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3326         scheduleRequest(request, procedure, null);
3327     }
3328
3329     @Override
3330     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3331         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3332     }
3333
3334     @Override
3335     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3336
3337         assert(request != null);
3338
3339         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3340             @Override
3341             public void exception(AsyncReadGraph graph, Throwable t) {
3342                 t.printStackTrace();
3343             }
3344         });
3345
3346     }
3347
3348     @Override
3349     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3350         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3351     }
3352
3353     @Override
3354     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3355         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3356     }
3357
3358     @Override
3359     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3360         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3361     }
3362
3363     @Override
3364     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3365         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3366     }
3367
3368     @Override
3369     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3370         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3371     }
3372
3373     @Override
3374     final public <T> void asyncRequest(final ExternalRead<T> request) {
3375
3376         assert(request != null);
3377
3378         asyncRequest(request, new ProcedureAdapter<T>() {
3379             @Override
3380             public void exception(Throwable t) {
3381                 t.printStackTrace();
3382             }
3383         });
3384
3385     }
3386
3387     @Override
3388     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3389         asyncRequest(request, (Procedure<T>)procedure);
3390     }
3391
3392     boolean sameProvider(Write request) {
3393         if(writeState.getGraph().provider != null) {
3394             return writeState.getGraph().provider.equals(request.getProvider());
3395         } else {
3396             return request.getProvider() == null;
3397         }
3398     }
3399
3400     boolean plainWrite(WriteGraphImpl graph) {
3401         if(graph == null) return false;
3402         if(graph.writeSupport.writeOnly()) return false;
3403         return true;
3404     }
3405
3406
3407     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3408
3409     private void assertNotSession() throws DatabaseException {
3410         Thread current = Thread.currentThread();
3411         if (sessionThreads.contains(current))
3412             throw new ServiceException("Caller is already inside a transaction.");
3413     }
3414
3415     void assertAlive() {
3416         if (!state.isAlive())
3417             throw new RuntimeDatabaseException("Session has been shut down.");
3418     }
3419
3420     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3421         return querySupport.getValueStream(graph, querySupport.getId(resource));
3422     }
3423
3424     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3425         return querySupport.getValue(graph, querySupport.getId(resource));
3426     }
3427
3428     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3429         acquire(semaphore, request, null);
3430     }
3431
3432     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3433         assertAlive();
3434         try {
3435             long tryCount = 0;
3436             // Loop until the semaphore is acquired reporting requests that take a long time.
3437             while (true) {
3438
3439                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3440                     // Acquired OK.
3441                     return;
3442                 } else {
3443                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3444                         ++tryCount;
3445                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3446                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3447                                 * tryCount;
3448                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3449                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3450                     }
3451                 }
3452
3453                 assertAlive();
3454
3455             }
3456         } catch (InterruptedException e) {
3457             e.printStackTrace();
3458             // FIXME: Should perhaps do something else in this case ??
3459         }
3460     }
3461
3462
3463
3464 //    @Override
3465 //    public boolean holdOnToTransactionAfterCancel() {
3466 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3467 //    }
3468 //
3469 //    @Override
3470 //    public boolean holdOnToTransactionAfterCommit() {
3471 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3472 //    }
3473 //
3474 //    @Override
3475 //    public boolean holdOnToTransactionAfterRead() {
3476 //        return transactionPolicy.holdOnToTransactionAfterRead();
3477 //    }
3478 //
3479 //    @Override
3480 //    public void onRelinquish() {
3481 //        transactionPolicy.onRelinquish();
3482 //    }
3483 //
3484 //    @Override
3485 //    public void onRelinquishDone() {
3486 //        transactionPolicy.onRelinquishDone();
3487 //    }
3488 //
3489 //    @Override
3490 //    public void onRelinquishError() {
3491 //        transactionPolicy.onRelinquishError();
3492 //    }
3493
3494
3495     @Override
3496     public Session getSession() {
3497         return this;
3498     }
3499
3500
3501     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3502     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3503
3504     public void ceased(int thread) {
3505
3506         requestManager.ceased(thread);
3507
3508     }
3509
3510     public int getAmountOfQueryThreads() {
3511         // This must be a power of two
3512         return 1;
3513 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3514     }
3515
3516     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3517
3518         return new ResourceImpl(resourceSupport, key);
3519
3520     }
3521
3522     public void acquireWriteOnly() {
3523         writeOnly = true;
3524     }
3525
3526     public void releaseWriteOnly(ReadGraphImpl graph) {
3527         writeOnly = false;
3528         queryProvider2.releaseWrite(graph);
3529     }
3530
3531     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3532
3533         long start = System.nanoTime();
3534
3535         while(dirtyPrimitives) {
3536             dirtyPrimitives = false;
3537             getQueryProvider2().performDirtyUpdates(writer);
3538             getQueryProvider2().performScheduledUpdates(writer);
3539         }
3540
3541         fireMetadataListeners(writer, clientChanges);
3542
3543         if(DebugPolicy.PERFORMANCE_DATA) {
3544             long end = System.nanoTime();
3545             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3546         }
3547
3548     }
3549
3550     void removeTemporaryData() {
3551         File platform = Platform.getLocation().toFile();
3552         File tempFiles = new File(platform, "tempFiles");
3553         File temp = new File(tempFiles, "db");
3554         if(!temp.exists()) 
3555             return;
3556         try {
3557             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3558                 @Override
3559                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3560                     try {
3561                         Files.delete(file);
3562                     } catch (IOException e) {
3563                         Logger.defaultLogError(e);
3564                     }
3565                     return FileVisitResult.CONTINUE;
3566                 }
3567             });
3568         } catch (IOException e) {
3569             Logger.defaultLogError(e);
3570         }
3571     }
3572
3573     public void handleCreatedClusters() {
3574         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3575             createdClusters.forEach(new TLongProcedure() {
3576
3577                 @Override
3578                 public boolean execute(long value) {
3579                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3580                     cluster.setImmutable(true, clusterTranslator);
3581                     return true;
3582                 }
3583             });
3584         }
3585         createdClusters.clear();
3586     }
3587
3588     @Override
3589     public Object getModificationCounter() {
3590         return queryProvider2.modificationCounter;
3591     }
3592     
3593     @Override
3594     public void markUndoPoint() {
3595         state.setCombine(false);
3596     }
3597
3598 }