]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Direct and synchronization-free access to Layer0 resource class for DB
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.EventSupport;
147 import org.simantics.db.service.GraphChangeListenerSupport;
148 import org.simantics.db.service.InitSupport;
149 import org.simantics.db.service.LifecycleSupport;
150 import org.simantics.db.service.ManagementSupport;
151 import org.simantics.db.service.QueryControl;
152 import org.simantics.db.service.SerialisationSupport;
153 import org.simantics.db.service.ServerInformation;
154 import org.simantics.db.service.ServiceActivityMonitor;
155 import org.simantics.db.service.SessionEventSupport;
156 import org.simantics.db.service.SessionMonitorSupport;
157 import org.simantics.db.service.SessionUserSupport;
158 import org.simantics.db.service.StatementSupport;
159 import org.simantics.db.service.TransactionPolicySupport;
160 import org.simantics.db.service.TransactionSupport;
161 import org.simantics.db.service.TransferableGraphSupport;
162 import org.simantics.db.service.UndoRedoSupport;
163 import org.simantics.db.service.VirtualGraphSupport;
164 import org.simantics.db.service.XSupport;
165 import org.simantics.layer0.Layer0;
166 import org.simantics.utils.DataContainer;
167 import org.simantics.utils.Development;
168 import org.simantics.utils.threads.logger.ITask;
169 import org.simantics.utils.threads.logger.ThreadLogger;
170 import org.slf4j.LoggerFactory;
171
172 import gnu.trove.procedure.TLongProcedure;
173 import gnu.trove.set.hash.TLongHashSet;
174
175
176 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
177
178     private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
179
180     protected static final boolean DEBUG        = false;
181
182     private static final boolean DIAGNOSTICS       = false;
183
184     private TransactionPolicySupport transactionPolicy;
185
186     final private ClusterControl clusterControl;
187
188     final protected BuiltinSupportImpl builtinSupport;
189     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
190     final protected ClusterSetsSupport clusterSetsSupport;
191     final protected LifecycleSupportImpl lifecycleSupport;
192
193     protected QuerySupportImpl querySupport;
194     protected ResourceSupportImpl resourceSupport;
195     protected WriteSupport writeSupport;
196     public ClusterTranslator clusterTranslator;
197
198     boolean dirtyPrimitives = false;
199
200     public static final int SERVICE_MODE_CREATE = 2;
201     public static final int SERVICE_MODE_ALLOW = 1;
202     public int serviceMode = 0;
203     public boolean createdImmutableClusters = false;
204     public TLongHashSet createdClusters = new TLongHashSet();
205
206     private Layer0 L0;
207
208     /**
209      * The service locator maintained by the workbench. These services are
210      * initialized during workbench during the <code>init</code> method.
211      */
212     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
213
214     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
215
216     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
217
218     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
219
220     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
221
222     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
223
224     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
225
226     final protected State                                        state              = new State();
227
228     protected GraphSession                                       graphSession       = null;
229
230     protected SessionManager                                     sessionManagerImpl = null;
231
232     protected UserAuthenticationAgent                            authAgent          = null;
233
234     protected UserAuthenticator                                  authenticator      = null;
235
236     protected Resource                                           user               = null;
237
238     protected ClusterStream                                      clusterStream      = null;
239
240     protected SessionRequestManager                         requestManager = null;
241
242     public ClusterTable                                       clusterTable       = null;
243
244     public QueryProcessor                                     queryProvider2 = null;
245
246     ClientChangesImpl                                  clientChanges      = null;
247
248     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
249
250 //    protected long                                               newClusterId       = Constants.NullClusterId;
251
252     protected int                                                flushCounter       = 0;
253
254     protected boolean                                            writeOnly          = false;
255
256     WriteState<?>                                                writeState = null;
257     WriteStateBase<?>                                            delayedWriteState = null;
258     protected Resource defaultClusterSet = null; // If not null then used for newResource().
259
260     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
261
262 //        if (authAgent == null)
263 //            throw new IllegalArgumentException("null authentication agent");
264
265         File t = StaticSessionProperties.virtualGraphStoragePath;
266         if (null == t)
267             t = new File(".");
268         this.clusterTable = new ClusterTable(this, t);
269         this.builtinSupport = new BuiltinSupportImpl(this);
270         this.sessionManagerImpl = sessionManagerImpl;
271         this.user = null;
272 //        this.authAgent = authAgent;
273
274         serviceLocator.registerService(Session.class, this);
275         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
276         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
277         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
278         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
279         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
280         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
281         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
282         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
283         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
284         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
285         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
286         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
287         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
288         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
289         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
290         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
291         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
292         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
293         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
294         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
295         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
296         serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
297         ServiceActivityUpdaterForWriteTransactions.register(this);
298
299         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
300         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
301         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
302         this.lifecycleSupport = new LifecycleSupportImpl(this);
303         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
304         this.transactionPolicy = new TransactionPolicyRelease();
305         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
306         this.clusterControl = new ClusterControlImpl(this);
307         serviceLocator.registerService(ClusterControl.class, clusterControl);
308         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
309         this.clusterSetsSupport.setReadDirectory(t.toPath());
310         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
311         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
312     }
313
314     /*
315      *
316      * Session interface
317      */
318
319
320     @Override
321     public Resource getRootLibrary() {
322         return queryProvider2.getRootLibraryResource();
323     }
324
325     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
326         if (!graphSession.dbSession.refreshEnabled())
327             return;
328         try {
329             getClusterTable().refresh(csid, this, clusterUID);
330         } catch (Throwable t) {
331             LOGGER.error("refresh({}, {}) failed", thread, csid, t);
332         }
333     }
334
335     static final class TaskHelper {
336         private final String name;
337         private Object result;
338         final Semaphore sema = new Semaphore(0);
339         private Throwable throwable = null;
340         final Consumer<DatabaseException> callback = e -> {
341             synchronized (TaskHelper.this) {
342                 throwable = e;
343             }
344         };
345         final Procedure<Object> proc = new Procedure<Object>() {
346             @Override
347             public void execute(Object result) {
348                 callback.accept(null);
349             }
350             @Override
351             public void exception(Throwable t) {
352                 if (t instanceof DatabaseException)
353                     callback.accept((DatabaseException)t);
354                 else
355                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
356             }
357         };
358         final WriteTraits writeTraits = new WriteTraits() {};
359         TaskHelper(String name) {
360             this.name = name;
361         }
362         @SuppressWarnings("unchecked")
363         <T> T getResult() {
364             return (T)result;
365         }
366         void setResult(Object result) {
367             this.result = result;
368         }
369 //        Throwable throwableGet() {
370 //            return throwable;
371 //        }
372         synchronized void throwableSet(Throwable t) {
373             throwable = t;
374         }
375 //        void throwableSet(String t) {
376 //            throwable = new InternalException("" + name + " operation failed. " + t);
377 //        }
378         synchronized void throwableCheck()
379         throws DatabaseException {
380             if (null != throwable)
381                 if (throwable instanceof DatabaseException)
382                     throw (DatabaseException)throwable;
383                 else
384                     throw new DatabaseException("Undo operation failed.", throwable);
385         }
386         void throw_(String message)
387         throws DatabaseException {
388             throw new DatabaseException("" + name + " operation failed. " + message);
389         }
390     }
391
392
393
394     private ListenerBase getListenerBase(Object procedure) {
395         if (procedure instanceof ListenerBase)
396             return (ListenerBase) procedure;
397         else
398             return null;
399     }
400
401     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
402         scheduleRequest(request, callback, notify, null);
403     }
404
405     /* (non-Javadoc)
406      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
407      */
408     @Override
409     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
410
411         assert (request != null);
412
413         if(Development.DEVELOPMENT) {
414             try {
415             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
416                 System.err.println("schedule write '" + request + "'");
417             } catch (Throwable t) {
418                 Logger.defaultLogError(t);
419             }
420         }
421
422         requestManager.scheduleWrite(new SessionTask(null) {
423
424             @Override
425             public void run0(int thread) {
426
427                 if(Development.DEVELOPMENT) {
428                     try {
429                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
430                         System.err.println("perform write '" + request + "'");
431                     } catch (Throwable t) {
432                         Logger.defaultLogError(t);
433                     }
434                 }
435
436                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
437
438                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
439
440                 try {
441
442                     flushCounter = 0;
443                     Disposable.safeDispose(clientChanges);
444                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
445
446                     VirtualGraph vg = getProvider(request.getProvider());
447
448                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
450
451                         @Override
452                         public void execute(Object result) {
453                             if(callback != null) callback.accept(null);
454                         }
455
456                         @Override
457                         public void exception(Throwable t) {
458                             if(callback != null) callback.accept((DatabaseException)t);
459                         }
460
461                     });
462
463                     assert (null != writer);
464                     writer.asyncBarrier.inc();
465
466                     try {
467                         request.perform(writer);
468                         assert (null != writer);
469                     } catch (Throwable t) {
470                         if (!(t instanceof CancelTransactionException))
471                             LOGGER.error("Write transaction caused an unexpected error, see exception.", t);
472                         writeState.except(t);
473                     } finally {
474                         writer.asyncBarrier.dec();
475 //                        writer.waitAsync(request);
476                     }
477
478
479                     assert(!queryProvider2.cache.dirty);
480
481                 } catch (Throwable e) {
482
483                     // Log it first, just to be safe that the error is always logged.
484                     if (!(e instanceof CancelTransactionException))
485                         LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
486
487 //                    writeState.getGraph().state.barrier.dec();
488 //                    writeState.getGraph().waitAsync(request);
489
490                     writeState.except(e);
491
492 //                    try {
493 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 //                        // All we can do here is to log those, can't really pass them anywhere.
495 //                        if (callback != null) {
496 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 //                            else callback.run(new DatabaseException(e));
498 //                        }
499 //                    } catch (Throwable e2) {
500 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
501 //                    }
502 //                    boolean empty = clusterStream.reallyFlush();
503 //                    int callerThread = -1;
504 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
506 //                    try {
507 //                        // Send all local changes to server so it can calculate correct reverse change set.
508 //                        // This will call clusterStream.accept().
509 //                        state.cancelCommit(context, clusterStream);
510 //                        if (!empty) {
511 //                            if (!context.isOk()) // this is a blocking operation
512 //                                throw new InternalException("Cancel failed. This should never happen.");
513 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
514 //                        }
515 //                        state.cancelCommit2(context, clusterStream);
516 //                    } catch (DatabaseException e3) {
517 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
518 //                    } finally {
519 //                        clusterStream.setOff(false);
520 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
521 //                    }
522
523                 } finally {
524
525                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
526
527                 }
528
529                 task.finish();
530
531                 if(Development.DEVELOPMENT) {
532                     try {
533                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534                         System.err.println("finish write '" + request + "'");
535                     } catch (Throwable t) {
536                         Logger.defaultLogError(t);
537                     }
538                 }
539
540             }
541
542         }, combine);
543
544     }
545
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547         scheduleRequest(request, procedure, notify, null);
548     }
549
550     /* (non-Javadoc)
551      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
552      */
553     @Override
554     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
555
556         assert (request != null);
557
558         requestManager.scheduleWrite(new SessionTask(null) {
559
560             @Override
561             public void run0(int thread) {
562
563                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
564
565                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
566
567                 flushCounter = 0;
568                 Disposable.safeDispose(clientChanges);
569                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
570
571                 VirtualGraph vg = getProvider(request.getProvider());
572                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
573
574                 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575                 writeState = writeStateT;
576                 assert (null != writer);
577
578                 try {
579                     writer.asyncBarrier.inc();
580                     writeStateT.setResult(request.perform(writer));
581                     assert (null != writer);
582                 } catch (Throwable e) {
583                     writeState.except(e);
584                 } finally {
585                     writer.asyncBarrier.dec();
586                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
587                 }
588
589                 task.finish();
590
591             }
592
593         }, combine);
594
595     }
596
597     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
598         scheduleRequest(request, callback, notify, null);
599     }
600
601     /* (non-Javadoc)
602      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
603      */
604     @Override
605     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
606
607         final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
608
609         assert (request != null);
610
611         requestManager.scheduleWrite(new SessionTask(null) {
612
613             @Override
614             public void run0(int thread) {
615                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
616
617                 Procedure<Object> stateProcedure = new Procedure<Object>() {
618                     @Override
619                     public void execute(Object result) {
620                         if (callback != null)
621                             callback.accept(null);
622                     }
623                     @Override
624                     public void exception(Throwable t) {
625                         if (callback != null) {
626                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
627                             else callback.accept(new DatabaseException(t));
628                         } else
629                             LOGGER.error("Unhandled exception", t);
630                     }
631                 };
632
633                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
634                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
635                 DelayedWriteGraph dwg = null;
636 //                newGraph.state.barrier.inc();
637
638                 try {
639                     dwg = new DelayedWriteGraph(newGraph);
640                     request.perform(dwg);
641                 } catch (Throwable e) {
642                     delayedWriteState.except(e);
643                     total.finish();
644                     dwg.close();
645                     return;
646                 } finally {
647 //                    newGraph.state.barrier.dec();
648 //                    newGraph.waitAsync(request);
649                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
650                 }
651
652                 delayedWriteState = null;
653
654                 ITask task2 = ThreadLogger.task("DelayedWriteCommit");
655                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
656
657                 flushCounter = 0;
658                 Disposable.safeDispose(clientChanges);
659                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
660
661                 acquireWriteOnly();
662
663                 VirtualGraph vg = getProvider(request.getProvider());
664                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
665
666                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
667
668                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
669
670                 assert (null != writer);
671 //                writer.state.barrier.inc();
672
673                 try {
674                     // Cannot cancel
675                     dwg.commit(writer, request);
676
677                         if(defaultClusterSet != null) {
678                                 XSupport xs = getService(XSupport.class);
679                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
680                                 ClusteringSupport cs = getService(ClusteringSupport.class);
681                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
682                         }
683
684                     // This makes clusters available from server
685                     clusterStream.reallyFlush();
686
687                     releaseWriteOnly(writer);
688                     handleUpdatesAndMetadata(writer);
689
690                 } catch (ServiceException e) {
691 //                    writer.state.barrier.dec();
692 //                    writer.waitAsync(null);
693
694                     // These shall be requested from server
695                     clusterTable.removeWriteOnlyClusters();
696                     // This makes clusters available from server
697                     clusterStream.reallyFlush();
698
699                     releaseWriteOnly(writer);
700                     writeState.except(e);
701                 } finally {
702                     // Debugging & Profiling
703                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
704                     task2.finish();
705                     total.finish();
706                 }
707             }
708
709         }, combine);
710
711     }
712
713     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
714         scheduleRequest(request, procedure, notify, null);
715     }
716
717     /* (non-Javadoc)
718      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
719      */
720     @Override
721     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
722         throw new Error("Not implemented");
723     }
724
725     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
726         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
727         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
728             createdClusters.add(cluster.clusterId);
729         }
730         return cluster;
731     }
732
733     class WriteOnlySupport implements WriteSupport {
734
735         ClusterStream stream;
736         ClusterImpl currentCluster;
737
738         public WriteOnlySupport() {
739             this.stream = clusterStream;
740         }
741
742         @Override
743         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
744             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
745         }
746
747         @Override
748         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
749
750             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
751
752             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
753                 if(s != queryProvider2.getRootLibrary())
754                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
755
756             try {
757                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
758             } catch (DatabaseException e) {
759                 Logger.defaultLogError(e);
760                 //throw e;
761             }
762
763             clientChanges.invalidate(s);
764
765             if (cluster.isWriteOnly())
766                 return;
767             queryProvider2.updateStatements(s, p);
768
769         }
770
771         @Override
772         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
773             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
774         }
775
776         @Override
777         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
778
779             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
780             try {
781                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
782             } catch (DatabaseException e) {
783                 Logger.defaultLogError(e);
784             }
785
786             clientChanges.invalidate(rid);
787
788             if (cluster.isWriteOnly())
789                 return;
790             queryProvider2.updateValue(rid);
791
792         }
793
794         @Override
795         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
796
797             if(amount < 65536) {
798                 claimValue(provider,resource, reader.readBytes(null, amount));
799                 return;
800             }
801
802             byte[] bytes = new byte[65536];
803
804             int rid = ((ResourceImpl)resource).id;
805             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
806             try {
807                 int left = amount;
808                 while(left > 0) {
809                     int block = Math.min(left, 65536);
810                     reader.readBytes(bytes, block);
811                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
812                     left -= block;
813                 }
814             } catch (DatabaseException e) {
815                 Logger.defaultLogError(e);
816             }
817
818             clientChanges.invalidate(rid);
819
820             if (cluster.isWriteOnly())
821                 return;
822             queryProvider2.updateValue(rid);
823
824         }
825
826         private void maintainCluster(ClusterImpl before, ClusterI after_) {
827             if(after_ != null && after_ != before) {
828                 ClusterImpl after = (ClusterImpl)after_;
829                 if(currentCluster == before) {
830                     currentCluster = after;
831                 }
832                 clusterTable.replaceCluster(after);
833             }
834         }
835
836         public int createResourceKey(int foreignCounter) throws DatabaseException {
837             if(currentCluster == null) {
838                 currentCluster = getNewResourceCluster();
839             }
840             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
841                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
842                 newCluster.foreignLookup = new byte[foreignCounter];
843                 currentCluster = newCluster;
844                 if (DEBUG)
845                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
846             }
847             return currentCluster.createResource(clusterTranslator);
848         }
849
850         @Override
851         public Resource createResource(VirtualGraph provider) throws DatabaseException {
852             if(currentCluster == null) {
853                 if (null != defaultClusterSet) {
854                         ResourceImpl result = getNewResource(defaultClusterSet);
855                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
856                     return result;
857                 } else {
858                     currentCluster = getNewResourceCluster();
859                 }
860             }
861             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
862                 if (null != defaultClusterSet) {
863                         ResourceImpl result = getNewResource(defaultClusterSet);
864                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
865                     return result;
866                 } else {
867                     currentCluster = getNewResourceCluster();
868                 }
869             }
870             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
871         }
872
873         @Override
874         public Resource createResource(VirtualGraph provider, long clusterId)
875         throws DatabaseException {
876             return getNewResource(clusterId);
877         }
878
879         @Override
880         public Resource createResource(VirtualGraph provider, Resource clusterSet)
881         throws DatabaseException {
882             return getNewResource(clusterSet);
883         }
884
885         @Override
886         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
887                 throws DatabaseException {
888             getNewClusterSet(clusterSet);
889         }
890
891         @Override
892         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
893         throws ServiceException {
894             return containsClusterSet(clusterSet);
895         }
896
897         public void selectCluster(long cluster) {
898                 currentCluster = clusterTable.getClusterByClusterId(cluster);
899                 long setResourceId = clusterSetsSupport.getSet(cluster);
900                 clusterSetsSupport.put(setResourceId, cluster);
901         }
902
903         @Override
904         public Resource setDefaultClusterSet(Resource clusterSet)
905         throws ServiceException {
906                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
907                 if(clusterSet != null) {
908                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
909                         currentCluster = clusterTable.getClusterByClusterId(id);
910                         return result;
911                 } else {
912                         currentCluster = null;
913                         return null;
914                 }
915         }
916
917         @Override
918         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
919
920                  provider = getProvider(provider);
921              if (null == provider) {
922                  int key = ((ResourceImpl)resource).id;
923
924
925
926                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
927 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
928 //                      if(key != queryProvider2.getRootLibrary())
929 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
930
931 //                 try {
932 //                     cluster.removeValue(key, clusterTranslator);
933 //                 } catch (DatabaseException e) {
934 //                     Logger.defaultLogError(e);
935 //                     return;
936 //                 }
937
938                  clusterTable.writeOnlyInvalidate(cluster);
939
940                  try {
941                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
942                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
943                          clusterTranslator.removeValue(cluster);
944                  } catch (DatabaseException e) {
945                          Logger.defaultLogError(e);
946                  }
947
948                  queryProvider2.invalidateResource(key);
949                  clientChanges.invalidate(key);
950
951              } else {
952                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
953                  queryProvider2.updateValue(querySupport.getId(resource));
954                  clientChanges.claimValue(resource);
955              }
956
957
958         }
959
960         @Override
961         public void flush(boolean intermediate) {
962             throw new UnsupportedOperationException();
963         }
964
965         @Override
966         public void flushCluster() {
967             clusterTable.flushCluster(graphSession);
968             if(defaultClusterSet != null) {
969                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
970             }
971             currentCluster = null;
972         }
973
974         @Override
975         public void flushCluster(Resource r) {
976             throw new UnsupportedOperationException("flushCluster resource " + r);
977         }
978
979         @Override
980         public void gc() {
981         }
982
983         @Override
984         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
985                 Resource object) {
986
987                 int s = ((ResourceImpl)subject).id;
988                 int p = ((ResourceImpl)predicate).id;
989                 int o = ((ResourceImpl)object).id;
990
991                 provider = getProvider(provider);
992                 if (null == provider) {
993
994                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
995                         clusterTable.writeOnlyInvalidate(cluster);
996
997                         try {
998
999                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1000
1001                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1002                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1003
1004                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1005                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1006                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1007                                 clusterTranslator.removeStatement(cluster);
1008
1009                                 queryProvider2.invalidateResource(s);
1010                                 clientChanges.invalidate(s);
1011
1012                         } catch (DatabaseException e) {
1013
1014                                 Logger.defaultLogError(e);
1015
1016                         }
1017
1018                         return true;
1019
1020                 } else {
1021                         
1022                         ((VirtualGraphImpl)provider).deny(s, p, o);
1023                         queryProvider2.invalidateResource(s);
1024                         clientChanges.invalidate(s);
1025
1026                         return true;
1027
1028                 }
1029
1030         }
1031
1032         @Override
1033         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1034             throw new UnsupportedOperationException();
1035         }
1036
1037         @Override
1038         public boolean writeOnly() {
1039             return true;
1040         }
1041
1042         @Override
1043         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1044             writeSupport.performWriteRequest(graph, request);
1045         }
1046
1047         @Override
1048         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1049             throw new UnsupportedOperationException();
1050         }
1051
1052         @Override
1053         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1054             throw new UnsupportedOperationException();
1055         }
1056
1057         @Override
1058         public <T> void addMetadata(Metadata data) throws ServiceException {
1059             writeSupport.addMetadata(data);
1060         }
1061
1062         @Override
1063         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1064             return writeSupport.getMetadata(clazz);
1065         }
1066
1067         @Override
1068         public TreeMap<String, byte[]> getMetadata() {
1069             return writeSupport.getMetadata();
1070         }
1071
1072         @Override
1073         public void commitDone(WriteTraits writeTraits, long csid) {
1074             writeSupport.commitDone(writeTraits, csid);
1075         }
1076
1077         @Override
1078         public void clearUndoList(WriteTraits writeTraits) {
1079             writeSupport.clearUndoList(writeTraits);
1080         }
1081         @Override
1082         public int clearMetadata() {
1083             return writeSupport.clearMetadata();
1084         }
1085
1086                 @Override
1087                 public void startUndo() {
1088                         writeSupport.startUndo();
1089                 }
1090     }
1091
1092     class VirtualWriteOnlySupport implements WriteSupport {
1093
1094 //        @Override
1095 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1096 //                Resource object) {
1097 //            throw new UnsupportedOperationException();
1098 //        }
1099
1100         @Override
1101         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1102
1103             TransientGraph impl = (TransientGraph)provider;
1104             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1105             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1106             clientChanges.claim(subject, predicate, object);
1107
1108         }
1109
1110         @Override
1111         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1112
1113             TransientGraph impl = (TransientGraph)provider;
1114             impl.claim(subject, predicate, object);
1115             getQueryProvider2().updateStatements(subject, predicate);
1116             clientChanges.claim(subject, predicate, object);
1117
1118         }
1119
1120         @Override
1121         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1122             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1123         }
1124
1125         @Override
1126         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1127             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1128             getQueryProvider2().updateValue(resource);
1129             clientChanges.claimValue(resource);
1130         }
1131
1132         @Override
1133         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1134             byte[] value = reader.readBytes(null, amount);
1135             claimValue(provider, resource, value);
1136         }
1137
1138         @Override
1139         public Resource createResource(VirtualGraph provider) {
1140             TransientGraph impl = (TransientGraph)provider;
1141             return impl.getResource(impl.newResource(false));
1142         }
1143
1144         @Override
1145         public Resource createResource(VirtualGraph provider, long clusterId) {
1146             throw new UnsupportedOperationException();
1147         }
1148
1149         @Override
1150         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1151         throws DatabaseException {
1152             throw new UnsupportedOperationException();
1153         }
1154
1155         @Override
1156         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1157         throws DatabaseException {
1158             throw new UnsupportedOperationException();
1159         }
1160
1161         @Override
1162         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1163         throws ServiceException {
1164             throw new UnsupportedOperationException();
1165         }
1166
1167         @Override
1168         public Resource setDefaultClusterSet(Resource clusterSet)
1169         throws ServiceException {
1170                 return null;
1171         }
1172
1173         @Override
1174         public void denyValue(VirtualGraph provider, Resource resource) {
1175             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1176             getQueryProvider2().updateValue(querySupport.getId(resource));
1177             // NOTE: this only keeps track of value changes by-resource.
1178             clientChanges.claimValue(resource);
1179         }
1180
1181         @Override
1182         public void flush(boolean intermediate) {
1183             throw new UnsupportedOperationException();
1184         }
1185
1186         @Override
1187         public void flushCluster() {
1188             throw new UnsupportedOperationException();
1189         }
1190
1191         @Override
1192         public void flushCluster(Resource r) {
1193             throw new UnsupportedOperationException("Resource " + r);
1194         }
1195
1196         @Override
1197         public void gc() {
1198         }
1199
1200         @Override
1201         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1202             TransientGraph impl = (TransientGraph) provider;
1203             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1204             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1205             clientChanges.deny(subject, predicate, object);
1206             return true;
1207         }
1208
1209         @Override
1210         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1211             throw new UnsupportedOperationException();
1212         }
1213
1214         @Override
1215         public boolean writeOnly() {
1216             return true;
1217         }
1218
1219         @Override
1220         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1221             throw new UnsupportedOperationException();
1222         }
1223
1224         @Override
1225         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1226             throw new UnsupportedOperationException();
1227         }
1228
1229         @Override
1230         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1231             throw new UnsupportedOperationException();
1232         }
1233
1234         @Override
1235         public <T> void addMetadata(Metadata data) throws ServiceException {
1236             throw new UnsupportedOperationException();
1237         }
1238
1239         @Override
1240         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1241             throw new UnsupportedOperationException();
1242         }
1243
1244         @Override
1245         public TreeMap<String, byte[]> getMetadata() {
1246             throw new UnsupportedOperationException();
1247         }
1248
1249         @Override
1250         public void commitDone(WriteTraits writeTraits, long csid) {
1251         }
1252
1253         @Override
1254         public void clearUndoList(WriteTraits writeTraits) {
1255         }
1256
1257         @Override
1258         public int clearMetadata() {
1259             return 0;
1260         }
1261
1262                 @Override
1263                 public void startUndo() {
1264                 }
1265     }
1266
1267     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1268
1269         try {
1270
1271             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1272
1273             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1274
1275             flushCounter = 0;
1276             Disposable.safeDispose(clientChanges);
1277             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1278
1279             acquireWriteOnly();
1280
1281             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1282
1283             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1284
1285             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1286             writeState = writeStateT;
1287
1288             assert (null != writer);
1289 //            writer.state.barrier.inc();
1290             long start = System.nanoTime();
1291             T result = request.perform(writer);
1292             long duration = System.nanoTime() - start;
1293             if (DEBUG) {
1294                 System.err.println("################");
1295                 System.err.println("WriteOnly duration " + 1e-9*duration);
1296             }
1297             writeStateT.setResult(result);
1298
1299             // This makes clusters available from server
1300             clusterStream.reallyFlush();
1301
1302             // This will trigger query updates
1303             releaseWriteOnly(writer);
1304             assert (null != writer);
1305
1306             handleUpdatesAndMetadata(writer);
1307
1308         } catch (CancelTransactionException e) {
1309
1310             releaseWriteOnly(writeState.getGraph());
1311
1312             clusterTable.removeWriteOnlyClusters();
1313             state.stopWriteTransaction(clusterStream);
1314
1315         } catch (Throwable e) {
1316
1317             e.printStackTrace();
1318
1319             releaseWriteOnly(writeState.getGraph());
1320
1321             clusterTable.removeWriteOnlyClusters();
1322
1323             if (callback != null)
1324                 callback.exception(new DatabaseException(e));
1325
1326             state.stopWriteTransaction(clusterStream);
1327             LOGGER.error("Write transaction caused an unexpected error, see exception.", e);
1328
1329         } finally  {
1330
1331             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1332
1333         }
1334
1335     }
1336
1337     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1338         scheduleRequest(request, callback, notify, null);
1339     }
1340
1341     @Override
1342     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1343
1344         assertAlive();
1345
1346         assert (request != null);
1347
1348         requestManager.scheduleWrite(new SessionTask(null) {
1349
1350             @Override
1351             public void run0(int thread) {
1352
1353                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1354
1355                     try {
1356
1357                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1358
1359                         flushCounter = 0;
1360                         Disposable.safeDispose(clientChanges);
1361                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1362
1363                         acquireWriteOnly();
1364
1365                         VirtualGraph vg = getProvider(request.getProvider());
1366                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1367
1368                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1369
1370                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1371
1372                             @Override
1373                             public void execute(Object result) {
1374                                 if(callback != null) callback.accept(null);
1375                             }
1376
1377                             @Override
1378                             public void exception(Throwable t) {
1379                                 if(callback != null) callback.accept((DatabaseException)t);
1380                             }
1381
1382                         });
1383
1384                         assert (null != writer);
1385 //                        writer.state.barrier.inc();
1386
1387                         try {
1388
1389                             request.perform(writer);
1390
1391                         } catch (Throwable e) {
1392
1393 //                            writer.state.barrier.dec();
1394 //                            writer.waitAsync(null);
1395
1396                             releaseWriteOnly(writer);
1397
1398                             clusterTable.removeWriteOnlyClusters();
1399
1400                             if(!(e instanceof CancelTransactionException)) {
1401                                 if (callback != null)
1402                                     callback.accept(new DatabaseException(e));
1403                             }
1404
1405                             writeState.except(e);
1406
1407                             return;
1408
1409                         }
1410
1411                         // This makes clusters available from server
1412                         boolean empty = clusterStream.reallyFlush();
1413                         // This was needed to make WO requests call metadata listeners.
1414                         // NOTE: the calling event does not contain clientChanges information.
1415                         if (!empty && clientChanges.isEmpty())
1416                             clientChanges.setNotEmpty(true);
1417                         releaseWriteOnly(writer);
1418                         assert (null != writer);
1419                     } finally  {
1420
1421                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1422
1423                     }
1424
1425
1426                 task.finish();
1427
1428             }
1429
1430         }, combine);
1431
1432     }
1433
1434     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1435         scheduleRequest(request, callback, notify, null);
1436     }
1437
1438     /* (non-Javadoc)
1439      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1440      */
1441     @Override
1442     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1443
1444         assert (request != null);
1445
1446         requestManager.scheduleWrite(new SessionTask(null) {
1447
1448             @Override
1449             public void run0(int thread) {
1450
1451                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1452
1453                 performWriteOnly(request, notify, callback);
1454
1455                 task.finish();
1456
1457             }
1458
1459         }, combine);
1460
1461     }
1462
1463     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1464
1465         assert (request != null);
1466         assert (procedure != null);
1467
1468         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1469
1470             @Override
1471             public void run0(int thread) {
1472
1473                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1474
1475                 ITask task = ThreadLogger.task(request);
1476
1477                 ListenerBase listener = getListenerBase(procedure);
1478
1479                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1480
1481                 // This is never synced but increase to prevent it from visiting 0
1482                 newGraph.asyncBarrier.inc();
1483
1484                 try {
1485
1486                     if (listener != null) {
1487
1488                         try {
1489                                 
1490                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1491
1492                                         @Override
1493                                         public void exception(AsyncReadGraph graph, Throwable t) {
1494                                                 procedure.exception(graph, t);
1495                                                 if(throwable != null) {
1496                                                         throwable.set(t);
1497                                                 } else {
1498                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1499                                                 }
1500                                         }
1501
1502                                         @Override
1503                                         public void execute(AsyncReadGraph graph, T t) {
1504                                                 if(result != null) result.set(t);
1505                                                 procedure.execute(graph, t);
1506                                         }
1507
1508                                 };
1509                                 
1510                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1511                                 
1512                         } catch (Throwable t) {
1513                             // This is handled by the AsyncProcedure
1514                                 //Logger.defaultLogError("Internal error", t);
1515                         }
1516
1517                     } else {
1518
1519                         try {
1520
1521                             T t = request.perform(newGraph);
1522
1523                             try {
1524
1525                                 if(result != null) result.set(t);
1526                                 procedure.execute(newGraph, t);
1527
1528                             } catch (Throwable th) {
1529
1530                                 if(throwable != null) {
1531                                     throwable.set(th);
1532                                 } else {
1533                                     LOGGER.error("Unhandled exception", th);
1534                                 }
1535
1536                             }
1537
1538                         } catch (Throwable t) {
1539
1540                             if (DEBUG)
1541                                 t.printStackTrace();
1542
1543                             if(throwable != null) {
1544                                 throwable.set(t);
1545                             } else {
1546                                 LOGGER.error("Unhandled exception", t);
1547                             }
1548
1549                             try {
1550
1551                                 procedure.exception(newGraph, t);
1552
1553                             } catch (Throwable t2) {
1554
1555                                 if(throwable != null) {
1556                                     throwable.set(t2);
1557                                 } else {
1558                                     LOGGER.error("Unhandled exception", t2);
1559                                 }
1560
1561                             }
1562
1563                         }
1564                         
1565                         task.finish();
1566
1567                     }
1568
1569                 } finally {
1570
1571                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1572
1573                 }
1574
1575             }
1576
1577         });
1578
1579     }
1580
1581     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1582
1583         assert (request != null);
1584         assert (procedure != null);
1585
1586         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1587
1588         requestManager.scheduleRead(new SessionRead(null, notify) {
1589
1590             @Override
1591             public void run0(int thread) {
1592
1593                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1594
1595                 ITask task = ThreadLogger.task(request);
1596
1597                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1598                 newGraph.asyncBarrier.inc();
1599
1600                 try {
1601
1602                     if (listener != null) {
1603
1604                         try {
1605                             QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1606                         } catch (DatabaseException e) {
1607                             LOGGER.error("Unhandled query exception", e);
1608                         }
1609
1610                     } else {
1611
1612                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
1613
1614                             public void execute(AsyncReadGraph graph_, T result) {
1615                                 task.finish();
1616                                 super.execute(graph_, result);
1617                             }
1618
1619                             public void exception(AsyncReadGraph graph_, Throwable t) {
1620                                 task.finish();
1621                                 super.exception(graph_, t);
1622                             }
1623
1624                         };
1625
1626                         try {
1627                             wrap.performSync(request);
1628                         } catch (DatabaseException e) {
1629                             LOGGER.error("Unhandled query exception", e);
1630                         }
1631
1632                     }
1633
1634                 } finally {
1635
1636                     newGraph.asyncBarrier.dec();
1637
1638                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1639
1640                 }
1641
1642             }
1643
1644         });
1645
1646     }
1647
1648     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1649
1650         assert (request != null);
1651         assert (procedure != null);
1652
1653         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1654
1655         int sync = notify != null ? thread : -1;
1656
1657         requestManager.scheduleRead(new SessionRead(null, notify) {
1658
1659             @Override
1660             public void run0(int thread) {
1661
1662                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1663
1664                 ListenerBase listener = getListenerBase(procedure);
1665
1666                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1667
1668                 try {
1669
1670                     if (listener != null) {
1671
1672                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1673
1674 //                        newGraph.waitAsync(request);
1675
1676                     } else {
1677
1678                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1679
1680                         try {
1681
1682                             request.perform(newGraph, wrapper);
1683
1684                         } catch (Throwable t) {
1685
1686                             t.printStackTrace();
1687
1688                         }
1689
1690                     }
1691
1692                 } finally {
1693
1694                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1695
1696                 }
1697
1698             }
1699
1700         });
1701
1702     }
1703     
1704     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1705
1706         assert (request != null);
1707         assert (procedure != null);
1708
1709         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1710
1711         int sync = notify != null ? thread : -1;
1712
1713         requestManager.scheduleRead(new SessionRead(null, notify) {
1714
1715             @Override
1716             public void run0(int thread) {
1717
1718                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1719
1720                 ListenerBase listener = getListenerBase(procedure);
1721
1722                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1723
1724                 try {
1725
1726                     if (listener != null) {
1727
1728                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1729
1730 //                        newGraph.waitAsync(request);
1731
1732                     } else {
1733
1734                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1735
1736                         try {
1737
1738                             request.perform(newGraph, wrapper);
1739
1740                         } catch (Throwable t) {
1741
1742                             t.printStackTrace();
1743
1744                         }
1745
1746                     }
1747
1748                 } finally {
1749
1750                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1751
1752                 }
1753
1754             }
1755
1756         });
1757
1758     }
1759
1760     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1761
1762         assert (request != null);
1763         assert (procedure != null);
1764
1765         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1766
1767         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1768
1769             @Override
1770             public void run0(int thread) {
1771
1772                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1773
1774                 ListenerBase listener = getListenerBase(procedure);
1775
1776                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1777
1778                 try {
1779
1780                     if (listener != null) {
1781
1782                         try {
1783                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1784                         } catch (DatabaseException e) {
1785                             Logger.defaultLogError(e);
1786                         }
1787
1788                     } else {
1789
1790 //                        newGraph.state.barrier.inc();
1791
1792                         request.register(newGraph, new Listener<T>() {
1793
1794                             @Override
1795                             public void exception(Throwable t) {
1796                                 if(throwable != null) throwable.set(t);
1797                                 procedure.exception(t);
1798 //                                newGraph.state.barrier.dec();
1799                             }
1800
1801                             @Override
1802                             public void execute(T t) {
1803                                 if(result != null) result.set(t);
1804                                 procedure.execute(t);
1805 //                                newGraph.state.barrier.dec();
1806                             }
1807
1808                             @Override
1809                             public boolean isDisposed() {
1810                                 return true;
1811                             }
1812
1813                         });
1814
1815 //                        newGraph.waitAsync(request);
1816
1817                     }
1818
1819                 } finally {
1820
1821                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1822
1823                 }
1824
1825             }
1826
1827         });
1828
1829     }
1830
1831
1832     @Override
1833     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1834
1835         scheduleRequest(request, procedure, null, null, null);
1836
1837     }
1838
1839     @Override
1840     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1841         assertNotSession();
1842         Semaphore notify = new Semaphore(0);
1843         DataContainer<Throwable> container = new DataContainer<Throwable>();
1844         DataContainer<T> result = new DataContainer<T>();
1845         scheduleRequest(request, procedure, notify, container, result);
1846         acquire(notify, request);
1847         Throwable throwable = container.get();
1848         if(throwable != null) {
1849             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1850             else throw new DatabaseException("Unexpected exception", throwable);
1851         }
1852         return result.get();
1853     }
1854
1855     @Override
1856     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1857         assertNotSession();
1858         Semaphore notify = new Semaphore(0);
1859         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1860         final DataContainer<T> resultContainer = new DataContainer<T>();
1861         scheduleRequest(request, new AsyncProcedure<T>() {
1862             @Override
1863             public void exception(AsyncReadGraph graph, Throwable throwable) {
1864                 exceptionContainer.set(throwable);
1865                 procedure.exception(graph, throwable);
1866             }
1867             @Override
1868             public void execute(AsyncReadGraph graph, T result) {
1869                 resultContainer.set(result);
1870                 procedure.execute(graph, result);
1871             }
1872         }, getListenerBase(procedure), notify);
1873         acquire(notify, request, procedure);
1874         Throwable throwable = exceptionContainer.get();
1875         if (throwable != null) {
1876             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1877             else throw new DatabaseException("Unexpected exception", throwable);
1878         }
1879         return resultContainer.get();
1880     }
1881
1882
1883     @Override
1884     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1885         assertNotSession();
1886         Semaphore notify = new Semaphore(0);
1887         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1888         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1889             @Override
1890             public void exception(AsyncReadGraph graph, Throwable throwable) {
1891                 exceptionContainer.set(throwable);
1892                 procedure.exception(graph, throwable);
1893             }
1894             @Override
1895             public void execute(AsyncReadGraph graph, T result) {
1896                 procedure.execute(graph, result);
1897             }
1898             @Override
1899             public void finished(AsyncReadGraph graph) {
1900                 procedure.finished(graph);
1901             }
1902         }, notify);
1903         acquire(notify, request, procedure);
1904         Throwable throwable = exceptionContainer.get();
1905         if (throwable != null) {
1906             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1907             else throw new DatabaseException("Unexpected exception", throwable);
1908         }
1909         // TODO: implement return value
1910         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1911         return null;
1912     }
1913
1914     @Override
1915     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1916         return syncRequest(request, new ProcedureAdapter<T>());
1917
1918
1919 //        assert(request != null);
1920 //
1921 //        final DataContainer<T> result = new DataContainer<T>();
1922 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1923 //
1924 //        syncRequest(request, new Procedure<T>() {
1925 //
1926 //            @Override
1927 //            public void execute(T t) {
1928 //                result.set(t);
1929 //            }
1930 //
1931 //            @Override
1932 //            public void exception(Throwable t) {
1933 //                exception.set(t);
1934 //            }
1935 //
1936 //        });
1937 //
1938 //        Throwable t = exception.get();
1939 //        if(t != null) {
1940 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1941 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1942 //        }
1943 //
1944 //        return result.get();
1945
1946     }
1947
1948     @Override
1949     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1950         return syncRequest(request, (Procedure<T>)procedure);
1951     }
1952
1953
1954 //    @Override
1955 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1956 //        assertNotSession();
1957 //        Semaphore notify = new Semaphore(0);
1958 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1959 //        DataContainer<T> result = new DataContainer<T>();
1960 //        scheduleRequest(request, procedure, notify, container, result);
1961 //        acquire(notify, request);
1962 //        Throwable throwable = container.get();
1963 //        if(throwable != null) {
1964 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1965 //            else throw new DatabaseException("Unexpected exception", throwable);
1966 //        }
1967 //        return result.get();
1968 //    }
1969
1970
1971     @Override
1972     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1973         assertNotSession();
1974         Semaphore notify = new Semaphore(0);
1975         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1976         final DataContainer<T> result = new DataContainer<T>();
1977         scheduleRequest(request, procedure, notify, container, result);
1978         acquire(notify, request);
1979         Throwable throwable = container.get();
1980         if (throwable != null) {
1981             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1982             else throw new DatabaseException("Unexpected exception", throwable);
1983         }
1984         return result.get();
1985     }
1986
1987     @Override
1988     public void syncRequest(Write request) throws DatabaseException  {
1989         assertNotSession();
1990         assertAlive();
1991         Semaphore notify = new Semaphore(0);
1992         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1993         scheduleRequest(request, e -> exception.set(e), notify);
1994         acquire(notify, request);
1995         if(exception.get() != null) throw exception.get();
1996     }
1997
1998     @Override
1999     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2000         assertNotSession();
2001         Semaphore notify = new Semaphore(0);
2002         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2003         final DataContainer<T> result = new DataContainer<T>();
2004         scheduleRequest(request, new Procedure<T>() {
2005
2006             @Override
2007             public void exception(Throwable t) {
2008               exception.set(t);
2009             }
2010
2011             @Override
2012             public void execute(T t) {
2013               result.set(t);
2014             }
2015
2016         }, notify);
2017         acquire(notify, request);
2018         if(exception.get() != null) {
2019             Throwable t = exception.get();
2020             if(t instanceof DatabaseException) throw (DatabaseException)t;
2021             else throw new DatabaseException(t);
2022         }
2023         return result.get();
2024     }
2025
2026     @Override
2027     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2028         assertNotSession();
2029         Semaphore notify = new Semaphore(0);
2030         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2031         final DataContainer<T> result = new DataContainer<T>();
2032         scheduleRequest(request, new Procedure<T>() {
2033
2034             @Override
2035             public void exception(Throwable t) {
2036               exception.set(t);
2037             }
2038
2039             @Override
2040             public void execute(T t) {
2041               result.set(t);
2042             }
2043
2044         }, notify);
2045         acquire(notify, request);
2046         if(exception.get() != null) {
2047             Throwable t = exception.get();
2048             if(t instanceof DatabaseException) throw (DatabaseException)t;
2049             else throw new DatabaseException(t);
2050         }
2051         return result.get();
2052     }
2053
2054     @Override
2055     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2056         assertNotSession();
2057         Semaphore notify = new Semaphore(0);
2058         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2059         final DataContainer<T> result = new DataContainer<T>();
2060         scheduleRequest(request, new Procedure<T>() {
2061
2062             @Override
2063             public void exception(Throwable t) {
2064               exception.set(t);
2065             }
2066
2067             @Override
2068             public void execute(T t) {
2069               result.set(t);
2070             }
2071
2072         }, notify);
2073         acquire(notify, request);
2074         if(exception.get() != null) {
2075             Throwable t = exception.get();
2076             if(t instanceof DatabaseException) throw (DatabaseException)t;
2077             else throw new DatabaseException(t);
2078         }
2079         return result.get();
2080     }
2081
2082     @Override
2083     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2084         assertNotSession();
2085         Semaphore notify = new Semaphore(0);
2086         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2087         scheduleRequest(request, e -> exception.set(e), notify);
2088         acquire(notify, request);
2089         if(exception.get() != null) throw exception.get();
2090     }
2091
2092     @Override
2093     public void syncRequest(WriteOnly request) throws DatabaseException  {
2094         assertNotSession();
2095         assertAlive();
2096         Semaphore notify = new Semaphore(0);
2097         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2098         scheduleRequest(request, e -> exception.set(e), notify);
2099         acquire(notify, request);
2100         if(exception.get() != null) throw exception.get();
2101     }
2102
2103     /*
2104      *
2105      * GraphRequestProcessor interface
2106      */
2107
2108     @Override
2109     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2110
2111         scheduleRequest(request, procedure, null, null);
2112
2113     }
2114
2115     @Override
2116     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2117
2118         scheduleRequest(request, procedure, null);
2119
2120     }
2121
2122     @Override
2123     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2124
2125         scheduleRequest(request, procedure, null, null, null);
2126
2127     }
2128
2129     @Override
2130     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2131
2132         scheduleRequest(request, callback, null);
2133
2134     }
2135
2136     @Override
2137     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2138
2139         scheduleRequest(request, procedure, null);
2140
2141     }
2142
2143     @Override
2144     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2145
2146         scheduleRequest(request, procedure, null);
2147
2148     }
2149
2150     @Override
2151     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2152
2153         scheduleRequest(request, procedure, null);
2154
2155     }
2156
2157     @Override
2158     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2159
2160         scheduleRequest(request, callback, null);
2161
2162     }
2163
2164     @Override
2165     public void asyncRequest(final Write r) {
2166         asyncRequest(r, null);
2167     }
2168
2169     @Override
2170     public void asyncRequest(final DelayedWrite r) {
2171         asyncRequest(r, null);
2172     }
2173
2174     @Override
2175     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2176
2177         scheduleRequest(request, callback, null);
2178
2179     }
2180
2181     @Override
2182     public void asyncRequest(final WriteOnly request) {
2183
2184         asyncRequest(request, null);
2185
2186     }
2187
2188     @Override
2189     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2190         r.request(this, procedure);
2191     }
2192
2193     @Override
2194     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2195         r.request(this, procedure);
2196     }
2197
2198     @Override
2199     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2200         r.request(this, procedure);
2201     }
2202
2203     @Override
2204     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2205         r.request(this, procedure);
2206     }
2207
2208     @Override
2209     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2210         r.request(this, procedure);
2211     }
2212
2213     @Override
2214     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2215         r.request(this, procedure);
2216     }
2217
2218     @Override
2219     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2220         return r.request(this);
2221     }
2222
2223     @Override
2224     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2225         return r.request(this);
2226     }
2227
2228     @Override
2229     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2230         r.request(this, procedure);
2231     }
2232
2233     @Override
2234     public <T> void async(WriteInterface<T> r) {
2235         r.request(this, new ProcedureAdapter<T>());
2236     }
2237
2238     //@Override
2239     public void incAsync() {
2240         state.incAsync();
2241     }
2242
2243     //@Override
2244     public void decAsync() {
2245         state.decAsync();
2246     }
2247
2248     public long getCluster(ResourceImpl resource) {
2249         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2250         return cluster.getClusterId();
2251     }
2252
2253     public long getCluster(int id) {
2254         if (clusterTable == null)
2255             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2256         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2257     }
2258
2259     public ResourceImpl getResource(int id) {
2260         return new ResourceImpl(resourceSupport, id);
2261     }
2262
2263     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2264         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2265         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2266         int key = proxy.getClusterKey();
2267         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2268         return new ResourceImpl(resourceSupport, resourceKey);
2269     }
2270
2271     public ResourceImpl getResource2(int id) {
2272         assert (id != 0);
2273         return new ResourceImpl(resourceSupport, id);
2274     }
2275
2276     final public int getId(ResourceImpl impl) {
2277         return impl.id;
2278     }
2279
2280     public static final Charset UTF8 = Charset.forName("utf-8");
2281
2282
2283
2284
2285     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2286         for(TransientGraph g : support.providers) {
2287             if(g.isPending(subject)) return false;
2288         }
2289         return true;
2290     }
2291
2292     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2293         for(TransientGraph g : support.providers) {
2294             if(g.isPending(subject, predicate)) return false;
2295         }
2296         return true;
2297     }
2298
2299     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2300
2301         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2302
2303             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2304
2305             @Override
2306             public void accept(ReadGraphImpl graph) {
2307                 if(ready.decrementAndGet() == 0) {
2308                     runnable.accept(graph);
2309                 }
2310             }
2311
2312         };
2313
2314         for(TransientGraph g : support.providers) {
2315             if(g.isPending(subject)) {
2316                 try {
2317                     g.load(graph, subject, composite);
2318                 } catch (DatabaseException e) {
2319                     e.printStackTrace();
2320                 }
2321             } else {
2322                 composite.accept(graph);
2323             }
2324         }
2325
2326         composite.accept(graph);
2327
2328     }
2329
2330     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2331
2332         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2333
2334             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2335
2336             @Override
2337             public void accept(ReadGraphImpl graph) {
2338                 if(ready.decrementAndGet() == 0) {
2339                     runnable.accept(graph);
2340                 }
2341             }
2342
2343         };
2344
2345         for(TransientGraph g : support.providers) {
2346             if(g.isPending(subject, predicate)) {
2347                 try {
2348                     g.load(graph, subject, predicate, composite);
2349                 } catch (DatabaseException e) {
2350                     e.printStackTrace();
2351                 }
2352             } else {
2353                 composite.accept(graph);
2354             }
2355         }
2356
2357         composite.accept(graph);
2358
2359     }
2360
2361 //    void dumpHeap() {
2362 //
2363 //        try {
2364 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2365 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2366 //                    "d:/heap" + flushCounter + ".txt", true);
2367 //        } catch (IOException e) {
2368 //            e.printStackTrace();
2369 //        }
2370 //
2371 //    }
2372
2373     void fireReactionsToSynchronize(ChangeSet cs) {
2374
2375         // Do not fire empty events
2376         if (cs.isEmpty())
2377             return;
2378
2379         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2380 //        g.state.barrier.inc();
2381
2382         try {
2383
2384             if (!cs.isEmpty()) {
2385                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2386                 for (ChangeListener l : changeListeners2) {
2387                     try {
2388                         l.graphChanged(e2);
2389                     } catch (Exception ex) {
2390                         ex.printStackTrace();
2391                     }
2392                 }
2393             }
2394
2395         } finally {
2396
2397 //            g.state.barrier.dec();
2398 //            g.waitAsync(null);
2399
2400         }
2401
2402     }
2403
2404     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2405         try {
2406
2407             // Do not fire empty events
2408             if (cs2.isEmpty())
2409                 return;
2410
2411 //            graph.restart();
2412 //            graph.state.barrier.inc();
2413
2414             try {
2415
2416                 if (!cs2.isEmpty()) {
2417
2418                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2419                     for (ChangeListener l : changeListeners2) {
2420                         try {
2421                             l.graphChanged(e2);
2422 //                            System.out.println("changelistener " + l);
2423                         } catch (Exception ex) {
2424                             ex.printStackTrace();
2425                         }
2426                     }
2427
2428                 }
2429
2430             } finally {
2431
2432 //                graph.state.barrier.dec();
2433 //                graph.waitAsync(null);
2434
2435             }
2436         } catch (Throwable t) {
2437             t.printStackTrace();
2438
2439         }
2440     }
2441     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2442
2443         try {
2444
2445             // Do not fire empty events
2446             if (cs2.isEmpty())
2447                 return;
2448
2449             // Do not fire on virtual requests
2450             if(graph.getProvider() != null)
2451                 return;
2452
2453             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2454             reactionGraph.asyncBarrier.inc();
2455
2456             try {
2457
2458                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2459                 for (ChangeListener l : metadataListeners) {
2460                     try {
2461                         l.graphChanged(e2);
2462                     } catch (Throwable ex) {
2463                         LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
2464                     }
2465                 }
2466
2467             } finally {
2468
2469                 reactionGraph.asyncBarrier.dec();
2470
2471             }
2472
2473         } catch (Throwable t) {
2474             LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
2475         }
2476
2477     }
2478
2479
2480     /*
2481      * (non-Javadoc)
2482      *
2483      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2484      */
2485     @Override
2486     public <T> T getService(Class<T> api) {
2487         T t = peekService(api);
2488         if (t == null) {
2489             if (state.isClosed())
2490                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2491             throw new ServiceNotFoundException(this, api);
2492         }
2493         return t;
2494     }
2495
2496     /**
2497      * @return
2498      */
2499     protected abstract ServerInformation getCachedServerInformation();
2500
2501         private Class<?> serviceKey1 = null;
2502         private Class<?> serviceKey2 = null;
2503         private Object service1 = null;
2504         private Object service2 = null;
2505
2506     /*
2507      * (non-Javadoc)
2508      *
2509      * @see
2510      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2511      */
2512     @SuppressWarnings("unchecked")
2513     @Override
2514     public <T> T peekService(Class<T> api) {
2515         if (Layer0.class == api)
2516             return (T) L0;
2517
2518         synchronized (this) {
2519             if (serviceKey1 == api) {
2520                 return (T) service1;
2521             }
2522             if (serviceKey2 == api) {
2523                 // Promote this key
2524                 Object result = service2;
2525                 service2 = service1;
2526                 serviceKey2 = serviceKey1;
2527                 service1 = result;
2528                 serviceKey1 = api;
2529                 return (T)result;
2530             }
2531
2532             if (ServerInformation.class == api)
2533                 return (T) getCachedServerInformation();
2534             else if (WriteGraphImpl.class == api)
2535                 return (T) writeState.getGraph();
2536             else if (ClusterBuilder.class == api)
2537                 return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2538             else if (ClusterBuilderFactory.class == api)
2539                 return (T)new ClusterBuilderFactoryImpl(this);
2540
2541             service2 = service1;
2542             serviceKey2 = serviceKey1;
2543
2544             service1 = serviceLocator.peekService(api);
2545             serviceKey1 = api;
2546
2547             return (T)service1;
2548         }
2549     }
2550
2551     /*
2552      * (non-Javadoc)
2553      *
2554      * @see
2555      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2556      */
2557     @Override
2558     public boolean hasService(Class<?> api) {
2559         return serviceLocator.hasService(api);
2560     }
2561
2562     /**
2563      * @param api the api that must be implemented by the specified service
2564      * @param service the service implementation
2565      */
2566     @Override
2567     public <T> void registerService(Class<T> api, T service) {
2568         if(Layer0.class == api) {
2569                 L0 = (Layer0)service;
2570                 return;
2571         }
2572         serviceLocator.registerService(api, service);
2573         if (TransactionPolicySupport.class == api) {
2574             transactionPolicy = (TransactionPolicySupport)service;
2575             state.resetTransactionPolicy();
2576         }
2577         if (api == serviceKey1)
2578                 service1 = service;
2579         else if (api == serviceKey2)
2580                 service2 = service;
2581     }
2582
2583     // ----------------
2584     // SessionMonitor
2585     // ----------------
2586
2587     void fireSessionVariableChange(String variable) {
2588         for (MonitorHandler h : monitorHandlers) {
2589             MonitorContext ctx = monitorContexts.getLeft(h);
2590             assert ctx != null;
2591             // SafeRunner functionality repeated here to avoid dependency.
2592             try {
2593                 h.valuesChanged(ctx);
2594             } catch (Exception e) {
2595                 LOGGER.error("monitor handler notification produced the following exception", e);
2596             } catch (LinkageError e) {
2597                 LOGGER.error("monitor handler notification produced a linkage error", e);
2598             }
2599         }
2600     }
2601
2602
2603     class ResourceSerializerImpl implements ResourceSerializer {
2604
2605         public long createRandomAccessId(int id) throws DatabaseException {
2606             if(id < 0)
2607                 return id;
2608             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2609             long cluster = getCluster(id);
2610             if (0 == cluster)
2611                 return 0; // Better to return 0 then invalid id.
2612             long result = ClusterTraitsBase.createResourceId(cluster, index);
2613             return result;
2614         }
2615
2616         @Override
2617         public long getRandomAccessId(Resource resource) throws DatabaseException {
2618
2619             ResourceImpl resourceImpl = (ResourceImpl) resource;
2620             return createRandomAccessId(resourceImpl.id);
2621
2622         }
2623
2624         @Override
2625         public int getTransientId(Resource resource) throws DatabaseException {
2626
2627             ResourceImpl resourceImpl = (ResourceImpl) resource;
2628             return resourceImpl.id;
2629
2630         }
2631
2632         @Override
2633         public String createRandomAccessId(Resource resource)
2634         throws InvalidResourceReferenceException {
2635
2636             if(resource == null) throw new IllegalArgumentException();
2637
2638             ResourceImpl resourceImpl = (ResourceImpl) resource;
2639
2640             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2641
2642             int r;
2643             try {
2644                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2645             } catch (DatabaseException e1) {
2646                 throw new InvalidResourceReferenceException(e1);
2647             }
2648             try {
2649                 // Serialize as '<resource index>_<cluster id>'
2650                 return "" + r + "_" + getCluster(resourceImpl);
2651             } catch (Throwable e) {
2652                 e.printStackTrace();
2653                 throw new InvalidResourceReferenceException(e);
2654             } finally {
2655             }
2656         }
2657
2658         public int getTransientId(long serialized) throws DatabaseException {
2659             if (serialized <= 0)
2660                 return (int)serialized;
2661             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2662             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2663             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2664             if (cluster == null)
2665                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2666             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2667             return key;
2668         }
2669
2670         @Override
2671         public Resource getResource(long randomAccessId) throws DatabaseException {
2672             return getResourceByKey(getTransientId(randomAccessId));
2673         }
2674
2675         @Override
2676         public Resource getResource(int transientId) throws DatabaseException {
2677             return getResourceByKey(transientId);
2678         }
2679
2680         @Override
2681         public Resource getResource(String randomAccessId)
2682         throws InvalidResourceReferenceException {
2683             try {
2684                 int i = randomAccessId.indexOf('_');
2685                 if (i == -1)
2686                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2687                             + randomAccessId + "'");
2688                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2689                 if(r < 0) return getResourceByKey(r);
2690                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2691                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2692                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2693                 if (cluster.hasResource(key, clusterTranslator))
2694                     return getResourceByKey(key);
2695             } catch (InvalidResourceReferenceException e) {
2696                 throw e;
2697             } catch (NumberFormatException e) {
2698                 throw new InvalidResourceReferenceException(e);
2699             } catch (Throwable e) {
2700                 e.printStackTrace();
2701                 throw new InvalidResourceReferenceException(e);
2702             } finally {
2703             }
2704             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2705         }
2706
2707         @Override
2708         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2709             try {
2710                 return true;
2711             } catch (Throwable e) {
2712                 e.printStackTrace();
2713                 throw new InvalidResourceReferenceException(e);
2714             } finally {
2715             }
2716         }
2717     }
2718
2719     // Copied from old SessionImpl
2720
2721     void check() {
2722         if (state.isClosed())
2723             throw new Error("Session closed.");
2724     }
2725
2726
2727
2728     public ResourceImpl getNewResource(long clusterId) {
2729         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2730         int newId;
2731         try {
2732             newId = cluster.createResource(clusterTranslator);
2733         } catch (DatabaseException e) {
2734             Logger.defaultLogError(e);
2735             return null;
2736         }
2737         return new ResourceImpl(resourceSupport, newId);
2738     }
2739
2740     public ResourceImpl getNewResource(Resource clusterSet)
2741     throws DatabaseException {
2742         long resourceId = clusterSet.getResourceId();
2743         Long clusterId = clusterSetsSupport.get(resourceId);
2744         if (null == clusterId)
2745             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2746         if (Constants.NewClusterId == clusterId) {
2747             clusterId = getService(ClusteringSupport.class).createCluster();
2748             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2749                 createdClusters.add(clusterId);
2750             clusterSetsSupport.put(resourceId, clusterId);
2751             return getNewResource(clusterId);
2752         } else {
2753             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2754             ResourceImpl result;
2755             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2756                 clusterId = getService(ClusteringSupport.class).createCluster();
2757                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2758                     createdClusters.add(clusterId);
2759                 clusterSetsSupport.put(resourceId, clusterId);
2760                 return getNewResource(clusterId);
2761             } else {
2762                 result = getNewResource(clusterId);
2763                 int resultKey = querySupport.getId(result);
2764                 long resultCluster = querySupport.getClusterId(resultKey);
2765                 if (clusterId != resultCluster)
2766                     clusterSetsSupport.put(resourceId, resultCluster);
2767                 return result;
2768             }
2769         }
2770     }
2771
2772     public void getNewClusterSet(Resource clusterSet)
2773     throws DatabaseException {
2774         if (DEBUG)
2775             System.out.println("new cluster set=" + clusterSet);
2776         long resourceId = clusterSet.getResourceId();
2777         if (clusterSetsSupport.containsKey(resourceId))
2778             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2779         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2780     }
2781     public boolean containsClusterSet(Resource clusterSet)
2782     throws ServiceException {
2783         long resourceId = clusterSet.getResourceId();
2784         return clusterSetsSupport.containsKey(resourceId);
2785     }
2786     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2787         Resource r = defaultClusterSet;
2788         defaultClusterSet = clusterSet;
2789         return r;
2790     }
2791     void printDiagnostics() {
2792
2793         if (DIAGNOSTICS) {
2794
2795             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2796             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2797             for(ClusterI cluster : clusterTable.getClusters()) {
2798                 try {
2799                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2800                         residentClusters.set(residentClusters.get() + 1);
2801                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2802                     }
2803                 } catch (DatabaseException e) {
2804                     Logger.defaultLogError(e);
2805                 }
2806             }
2807
2808             System.out.println("--------------------------------");
2809             System.out.println("Cluster information:");
2810             System.out.println("-amount of resident clusters=" + residentClusters.get());
2811             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2812
2813             for(ClusterI cluster : clusterTable.getClusters()) {
2814                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2815                 try {
2816                     if (!cluster.isLoaded())
2817                         System.out.println("not loaded]");
2818                     else if (cluster.isEmpty())
2819                         System.out.println("is empty]");
2820                     else {
2821                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2822                         System.out.print(",references=" + cluster.getReferenceCount());
2823                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2824                     }
2825                 } catch (DatabaseException e) {
2826                     Logger.defaultLogError(e);
2827                     System.out.println("is corrupted]");
2828                 }
2829             }
2830
2831             queryProvider2.printDiagnostics();
2832             System.out.println("--------------------------------");
2833         }
2834
2835     }
2836
2837     public void fireStartReadTransaction() {
2838
2839         if (DIAGNOSTICS)
2840             System.out.println("StartReadTransaction");
2841
2842         for (SessionEventListener listener : eventListeners) {
2843             try {
2844                 listener.readTransactionStarted();
2845             } catch (Throwable t) {
2846                 t.printStackTrace();
2847             }
2848         }
2849
2850     }
2851
2852     public void fireFinishReadTransaction() {
2853
2854         if (DIAGNOSTICS)
2855             System.out.println("FinishReadTransaction");
2856
2857         for (SessionEventListener listener : eventListeners) {
2858             try {
2859                 listener.readTransactionFinished();
2860             } catch (Throwable t) {
2861                 t.printStackTrace();
2862             }
2863         }
2864
2865     }
2866
2867     public void fireStartWriteTransaction() {
2868
2869         if (DIAGNOSTICS)
2870             System.out.println("StartWriteTransaction");
2871
2872         for (SessionEventListener listener : eventListeners) {
2873             try {
2874                 listener.writeTransactionStarted();
2875             } catch (Throwable t) {
2876                 t.printStackTrace();
2877             }
2878         }
2879
2880     }
2881
2882     public void fireFinishWriteTransaction() {
2883
2884         if (DIAGNOSTICS)
2885             System.out.println("FinishWriteTransaction");
2886
2887         for (SessionEventListener listener : eventListeners) {
2888             try {
2889                 listener.writeTransactionFinished();
2890             } catch (Throwable t) {
2891                 t.printStackTrace();
2892             }
2893         }
2894
2895         Indexing.resetDependenciesIndexingDisabled();
2896
2897     }
2898
2899     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2900         throw new Error("Not supported at the moment.");
2901     }
2902
2903     State getState() {
2904         return state;
2905     }
2906
2907     ClusterTable getClusterTable() {
2908         return clusterTable;
2909     }
2910
2911     public GraphSession getGraphSession() {
2912         return graphSession;
2913     }
2914
2915     public QueryProcessor getQueryProvider2() {
2916         return queryProvider2;
2917     }
2918
2919     ClientChangesImpl getClientChanges() {
2920         return clientChanges;
2921     }
2922
2923     boolean getWriteOnly() {
2924         return writeOnly;
2925     }
2926
2927     static int counter = 0;
2928
2929     public void onClusterLoaded(long clusterId) {
2930
2931         clusterTable.updateSize();
2932
2933     }
2934
2935
2936
2937
2938     /*
2939      * Implementation of the interface RequestProcessor
2940      */
2941
2942     @Override
2943     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2944
2945         assertNotSession();
2946         assertAlive();
2947
2948         assert(request != null);
2949
2950         final DataContainer<T> result = new DataContainer<T>();
2951         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2952
2953         syncRequest(request, new AsyncProcedure<T>() {
2954
2955             @Override
2956             public void execute(AsyncReadGraph graph, T t) {
2957                 result.set(t);
2958             }
2959
2960             @Override
2961             public void exception(AsyncReadGraph graph, Throwable t) {
2962                 exception.set(t);
2963             }
2964
2965         });
2966
2967         Throwable t = exception.get();
2968         if(t != null) {
2969             if(t instanceof DatabaseException) throw (DatabaseException)t;
2970             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2971         }
2972
2973         return result.get();
2974
2975     }
2976
2977 //    @Override
2978 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2979 //        assertNotSession();
2980 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2981 //    }
2982
2983     @Override
2984     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2985         assertNotSession();
2986         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2987     }
2988
2989     @Override
2990     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2991         assertNotSession();
2992         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2993     }
2994
2995     @Override
2996     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2997         assertNotSession();
2998         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2999     }
3000
3001     @Override
3002     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3003         assertNotSession();
3004         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3005     }
3006
3007     @Override
3008     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3009
3010         assertNotSession();
3011
3012         assert(request != null);
3013
3014         final DataContainer<T> result = new DataContainer<T>();
3015         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3016
3017         syncRequest(request, new AsyncProcedure<T>() {
3018
3019             @Override
3020             public void execute(AsyncReadGraph graph, T t) {
3021                 result.set(t);
3022             }
3023
3024             @Override
3025             public void exception(AsyncReadGraph graph, Throwable t) {
3026                 exception.set(t);
3027             }
3028
3029         });
3030
3031         Throwable t = exception.get();
3032         if(t != null) {
3033             if(t instanceof DatabaseException) throw (DatabaseException)t;
3034             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3035         }
3036
3037         return result.get();
3038
3039     }
3040
3041     @Override
3042     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3043         assertNotSession();
3044         return syncRequest(request, (AsyncProcedure<T>)procedure);
3045     }
3046
3047     @Override
3048     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3049         assertNotSession();
3050         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3051     }
3052
3053     @Override
3054     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3055         assertNotSession();
3056         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3057     }
3058
3059     @Override
3060     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3061         assertNotSession();
3062         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3063     }
3064
3065     @Override
3066     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3067         assertNotSession();
3068         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3069     }
3070
3071     @Override
3072     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3073
3074         assertNotSession();
3075
3076         assert(request != null);
3077
3078         final ArrayList<T> result = new ArrayList<T>();
3079         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3080
3081         syncRequest(request, new SyncMultiProcedure<T>() {
3082
3083             @Override
3084             public void execute(ReadGraph graph, T t) {
3085                 synchronized(result) {
3086                     result.add(t);
3087                 }
3088             }
3089
3090             @Override
3091             public void finished(ReadGraph graph) {
3092             }
3093
3094             @Override
3095             public void exception(ReadGraph graph, Throwable t) {
3096                 exception.set(t);
3097             }
3098
3099
3100         });
3101
3102         Throwable t = exception.get();
3103         if(t != null) {
3104             if(t instanceof DatabaseException) throw (DatabaseException)t;
3105             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3106         }
3107
3108         return result;
3109
3110     }
3111
3112     @Override
3113     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3114         assertNotSession();
3115         throw new Error("Not implemented!");
3116     }
3117
3118     @Override
3119     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3120         assertNotSession();
3121         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3122     }
3123
3124     @Override
3125     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3126         assertNotSession();
3127         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3128     }
3129
3130     @Override
3131     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3132         assertNotSession();
3133         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3134     }
3135
3136     @Override
3137     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3138
3139         assertNotSession();
3140
3141         assert(request != null);
3142
3143         final ArrayList<T> result = new ArrayList<T>();
3144         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3145
3146         syncRequest(request, new AsyncMultiProcedure<T>() {
3147
3148             @Override
3149             public void execute(AsyncReadGraph graph, T t) {
3150                 synchronized(result) {
3151                     result.add(t);
3152                 }
3153             }
3154
3155             @Override
3156             public void finished(AsyncReadGraph graph) {
3157             }
3158
3159             @Override
3160             public void exception(AsyncReadGraph graph, Throwable t) {
3161                 exception.set(t);
3162             }
3163
3164
3165         });
3166
3167         Throwable t = exception.get();
3168         if (t != null) {
3169             if (t instanceof DatabaseException)
3170                 throw (DatabaseException) t;
3171             else
3172                 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3173         }
3174
3175         return result;
3176
3177     }
3178
3179     @Override
3180     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3181         assertNotSession();
3182         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3183     }
3184
3185     @Override
3186     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3187         assertNotSession();
3188         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3189     }
3190
3191     @Override
3192     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3193         assertNotSession();
3194        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3195     }
3196
3197     @Override
3198     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3199         assertNotSession();
3200         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3201     }
3202
3203     @Override
3204     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3205         assertNotSession();
3206         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3207     }
3208
3209
3210     @Override
3211     public <T> void asyncRequest(Read<T> request) {
3212
3213         asyncRequest(request, new ProcedureAdapter<T>() {
3214             @Override
3215             public void exception(Throwable t) {
3216                 t.printStackTrace();
3217             }
3218         });
3219
3220     }
3221
3222     @Override
3223     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3224         asyncRequest(request, (AsyncProcedure<T>)procedure);
3225     }
3226
3227     @Override
3228     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3229         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3230     }
3231
3232     @Override
3233     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3234         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3235     }
3236
3237     @Override
3238     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3239         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3240     }
3241
3242     @Override
3243     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3244         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3245     }
3246
3247     @Override
3248     final public <T> void asyncRequest(final AsyncRead<T> request) {
3249
3250         assert(request != null);
3251
3252         asyncRequest(request, new ProcedureAdapter<T>() {
3253             @Override
3254             public void exception(Throwable t) {
3255                 t.printStackTrace();
3256             }
3257         });
3258
3259     }
3260
3261     @Override
3262     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3263         scheduleRequest(request, procedure, procedure, null);
3264     }
3265
3266     @Override
3267     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3268         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3269     }
3270
3271     @Override
3272     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3273         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3274     }
3275
3276     @Override
3277     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3278         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3279     }
3280
3281     @Override
3282     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3283         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3284     }
3285
3286     @Override
3287     public <T> void asyncRequest(MultiRead<T> request) {
3288
3289         assert(request != null);
3290
3291         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3292             @Override
3293             public void exception(ReadGraph graph, Throwable t) {
3294                 t.printStackTrace();
3295             }
3296         });
3297
3298     }
3299
3300     @Override
3301     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3302         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3303     }
3304
3305     @Override
3306     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3307         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3308     }
3309
3310     @Override
3311     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3312         scheduleRequest(request, procedure, null);
3313     }
3314
3315     @Override
3316     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3317         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3318     }
3319
3320     @Override
3321     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3322
3323         assert(request != null);
3324
3325         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3326             @Override
3327             public void exception(AsyncReadGraph graph, Throwable t) {
3328                 t.printStackTrace();
3329             }
3330         });
3331
3332     }
3333
3334     @Override
3335     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3336         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3337     }
3338
3339     @Override
3340     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3341         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3342     }
3343
3344     @Override
3345     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3346         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3347     }
3348
3349     @Override
3350     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3351         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3352     }
3353
3354     @Override
3355     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3356         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3357     }
3358
3359     @Override
3360     final public <T> void asyncRequest(final ExternalRead<T> request) {
3361
3362         assert(request != null);
3363
3364         asyncRequest(request, new ProcedureAdapter<T>() {
3365             @Override
3366             public void exception(Throwable t) {
3367                 t.printStackTrace();
3368             }
3369         });
3370
3371     }
3372
3373     @Override
3374     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3375         asyncRequest(request, (Procedure<T>)procedure);
3376     }
3377
3378     boolean sameProvider(Write request) {
3379         if(writeState.getGraph().provider != null) {
3380             return writeState.getGraph().provider.equals(request.getProvider());
3381         } else {
3382             return request.getProvider() == null;
3383         }
3384     }
3385
3386     boolean plainWrite(WriteGraphImpl graph) {
3387         if(graph == null) return false;
3388         if(graph.writeSupport.writeOnly()) return false;
3389         return true;
3390     }
3391
3392
3393     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3394
3395     private void assertNotSession() throws DatabaseException {
3396         Thread current = Thread.currentThread();
3397         if (sessionThreads.contains(current))
3398             throw new ServiceException("Caller is already inside a transaction.");
3399     }
3400
3401     void assertAlive() {
3402         if (!state.isAlive())
3403             throw new RuntimeDatabaseException("Session has been shut down.");
3404     }
3405
3406     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3407         return querySupport.getValueStream(graph, querySupport.getId(resource));
3408     }
3409
3410     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3411         return querySupport.getValue(graph, querySupport.getId(resource));
3412     }
3413
3414     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3415         acquire(semaphore, request, null);
3416     }
3417
3418     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3419         assertAlive();
3420         try {
3421             long tryCount = 0;
3422             // Loop until the semaphore is acquired reporting requests that take a long time.
3423             while (true) {
3424
3425                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3426                     // Acquired OK.
3427                     return;
3428                 } else {
3429                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3430                         ++tryCount;
3431                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3432                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3433                                 * tryCount;
3434                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3435                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3436                     }
3437                 }
3438
3439                 assertAlive();
3440
3441             }
3442         } catch (InterruptedException e) {
3443             e.printStackTrace();
3444             // FIXME: Should perhaps do something else in this case ??
3445         }
3446     }
3447
3448
3449
3450 //    @Override
3451 //    public boolean holdOnToTransactionAfterCancel() {
3452 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3453 //    }
3454 //
3455 //    @Override
3456 //    public boolean holdOnToTransactionAfterCommit() {
3457 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3458 //    }
3459 //
3460 //    @Override
3461 //    public boolean holdOnToTransactionAfterRead() {
3462 //        return transactionPolicy.holdOnToTransactionAfterRead();
3463 //    }
3464 //
3465 //    @Override
3466 //    public void onRelinquish() {
3467 //        transactionPolicy.onRelinquish();
3468 //    }
3469 //
3470 //    @Override
3471 //    public void onRelinquishDone() {
3472 //        transactionPolicy.onRelinquishDone();
3473 //    }
3474 //
3475 //    @Override
3476 //    public void onRelinquishError() {
3477 //        transactionPolicy.onRelinquishError();
3478 //    }
3479
3480
3481     @Override
3482     public Session getSession() {
3483         return this;
3484     }
3485
3486
3487     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3488     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3489
3490     public void ceased(int thread) {
3491
3492         requestManager.ceased(thread);
3493
3494     }
3495
3496     public int getAmountOfQueryThreads() {
3497         // This must be a power of two
3498         return 1;
3499 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3500     }
3501
3502     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3503
3504         return new ResourceImpl(resourceSupport, key);
3505
3506     }
3507
3508     public void acquireWriteOnly() {
3509         writeOnly = true;
3510     }
3511
3512     public void releaseWriteOnly(ReadGraphImpl graph) {
3513         writeOnly = false;
3514         queryProvider2.releaseWrite(graph);
3515     }
3516
3517     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3518
3519         long start = System.nanoTime();
3520
3521         while(dirtyPrimitives) {
3522             dirtyPrimitives = false;
3523             getQueryProvider2().propagateChangesInQueryCache(writer);
3524             getQueryProvider2().listening.fireListeners(writer);
3525         }
3526
3527         fireMetadataListeners(writer, clientChanges);
3528
3529         if(DebugPolicy.PERFORMANCE_DATA) {
3530             long end = System.nanoTime();
3531             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3532         }
3533
3534     }
3535
3536     void removeTemporaryData() {
3537         File platform = Platform.getLocation().toFile();
3538         File tempFiles = new File(platform, "tempFiles");
3539         File temp = new File(tempFiles, "db");
3540         if(!temp.exists()) 
3541             return;
3542         try {
3543             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3544                 @Override
3545                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3546                     try {
3547                         Files.delete(file);
3548                     } catch (IOException e) {
3549                         Logger.defaultLogError(e);
3550                     }
3551                     return FileVisitResult.CONTINUE;
3552                 }
3553             });
3554         } catch (IOException e) {
3555             Logger.defaultLogError(e);
3556         }
3557     }
3558
3559     public void handleCreatedClusters() {
3560         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3561             createdClusters.forEach(new TLongProcedure() {
3562
3563                 @Override
3564                 public boolean execute(long value) {
3565                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3566                     cluster.setImmutable(true, clusterTranslator);
3567                     return true;
3568                 }
3569             });
3570         }
3571         createdClusters.clear();
3572     }
3573
3574     @Override
3575     public Object getModificationCounter() {
3576         return queryProvider2.modificationCounter;
3577     }
3578     
3579     @Override
3580     public void markUndoPoint() {
3581         state.setCombine(false);
3582     }
3583
3584     @SuppressWarnings("unchecked")
3585     @Override
3586     public <T> T l0() {
3587         return (T) L0;
3588     }
3589
3590 }