]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
AsyncBarrier.dec runs into refcounting problem
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2018 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.ReadGraph;
43 import org.simantics.db.Resource;
44 import org.simantics.db.ResourceSerializer;
45 import org.simantics.db.Session;
46 import org.simantics.db.SessionManager;
47 import org.simantics.db.SessionVariables;
48 import org.simantics.db.VirtualGraph;
49 import org.simantics.db.WriteGraph;
50 import org.simantics.db.authentication.UserAuthenticationAgent;
51 import org.simantics.db.authentication.UserAuthenticator;
52 import org.simantics.db.common.Indexing;
53 import org.simantics.db.common.TransactionPolicyRelease;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.adapter.SyncMultiProcedureAdapter;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
60 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
61 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.NoneToSyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
64 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
65 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
66 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
67 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
68 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
69 import org.simantics.db.common.utils.Logger;
70 import org.simantics.db.event.ChangeEvent;
71 import org.simantics.db.event.ChangeListener;
72 import org.simantics.db.event.SessionEventListener;
73 import org.simantics.db.exception.CancelTransactionException;
74 import org.simantics.db.exception.ClusterSetExistException;
75 import org.simantics.db.exception.DatabaseException;
76 import org.simantics.db.exception.ImmutableException;
77 import org.simantics.db.exception.InvalidResourceReferenceException;
78 import org.simantics.db.exception.ResourceNotFoundException;
79 import org.simantics.db.exception.RuntimeDatabaseException;
80 import org.simantics.db.exception.ServiceException;
81 import org.simantics.db.exception.ServiceNotFoundException;
82 import org.simantics.db.impl.BlockingAsyncProcedure;
83 import org.simantics.db.impl.ClusterBase;
84 import org.simantics.db.impl.ClusterI;
85 import org.simantics.db.impl.ClusterTraitsBase;
86 import org.simantics.db.impl.ClusterTranslator;
87 import org.simantics.db.impl.ResourceImpl;
88 import org.simantics.db.impl.TransientGraph;
89 import org.simantics.db.impl.VirtualGraphImpl;
90 import org.simantics.db.impl.graph.DelayedWriteGraph;
91 import org.simantics.db.impl.graph.ReadGraphImpl;
92 import org.simantics.db.impl.graph.WriteGraphImpl;
93 import org.simantics.db.impl.graph.WriteSupport;
94 import org.simantics.db.impl.internal.RandomAccessValueSupport;
95 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
96 import org.simantics.db.impl.procedure.ResultCallWrappedSyncQueryProcedure;
97 import org.simantics.db.impl.query.QueryCache;
98 import org.simantics.db.impl.query.QueryCacheBase;
99 import org.simantics.db.impl.query.QueryProcessor;
100 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
101 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
102 import org.simantics.db.impl.service.QueryDebug;
103 import org.simantics.db.impl.support.VirtualGraphServerSupport;
104 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
105 import org.simantics.db.procedure.AsyncListener;
106 import org.simantics.db.procedure.AsyncMultiListener;
107 import org.simantics.db.procedure.AsyncMultiProcedure;
108 import org.simantics.db.procedure.AsyncProcedure;
109 import org.simantics.db.procedure.Listener;
110 import org.simantics.db.procedure.ListenerBase;
111 import org.simantics.db.procedure.MultiListener;
112 import org.simantics.db.procedure.MultiProcedure;
113 import org.simantics.db.procedure.Procedure;
114 import org.simantics.db.procedure.SyncListener;
115 import org.simantics.db.procedure.SyncMultiListener;
116 import org.simantics.db.procedure.SyncMultiProcedure;
117 import org.simantics.db.procedure.SyncProcedure;
118 import org.simantics.db.procore.cluster.ClusterImpl;
119 import org.simantics.db.procore.cluster.ClusterTraits;
120 import org.simantics.db.procore.protocol.Constants;
121 import org.simantics.db.procore.protocol.DebugPolicy;
122 import org.simantics.db.request.AsyncMultiRead;
123 import org.simantics.db.request.AsyncRead;
124 import org.simantics.db.request.DelayedWrite;
125 import org.simantics.db.request.DelayedWriteResult;
126 import org.simantics.db.request.ExternalRead;
127 import org.simantics.db.request.MultiRead;
128 import org.simantics.db.request.Read;
129 import org.simantics.db.request.ReadInterface;
130 import org.simantics.db.request.Write;
131 import org.simantics.db.request.WriteInterface;
132 import org.simantics.db.request.WriteOnly;
133 import org.simantics.db.request.WriteOnlyResult;
134 import org.simantics.db.request.WriteResult;
135 import org.simantics.db.request.WriteTraits;
136 import org.simantics.db.service.ByteReader;
137 import org.simantics.db.service.ClusterBuilder;
138 import org.simantics.db.service.ClusterBuilderFactory;
139 import org.simantics.db.service.ClusterControl;
140 import org.simantics.db.service.ClusterSetsSupport;
141 import org.simantics.db.service.ClusterUID;
142 import org.simantics.db.service.ClusteringSupport;
143 import org.simantics.db.service.CollectionSupport;
144 import org.simantics.db.service.DebugSupport;
145 import org.simantics.db.service.DirectQuerySupport;
146 import org.simantics.db.service.EventSupport;
147 import org.simantics.db.service.GraphChangeListenerSupport;
148 import org.simantics.db.service.InitSupport;
149 import org.simantics.db.service.LifecycleSupport;
150 import org.simantics.db.service.ManagementSupport;
151 import org.simantics.db.service.QueryControl;
152 import org.simantics.db.service.SerialisationSupport;
153 import org.simantics.db.service.ServerInformation;
154 import org.simantics.db.service.ServiceActivityMonitor;
155 import org.simantics.db.service.SessionEventSupport;
156 import org.simantics.db.service.SessionMonitorSupport;
157 import org.simantics.db.service.SessionUserSupport;
158 import org.simantics.db.service.StatementSupport;
159 import org.simantics.db.service.TransactionPolicySupport;
160 import org.simantics.db.service.TransactionSupport;
161 import org.simantics.db.service.TransferableGraphSupport;
162 import org.simantics.db.service.UndoRedoSupport;
163 import org.simantics.db.service.VirtualGraphSupport;
164 import org.simantics.db.service.XSupport;
165 import org.simantics.layer0.Layer0;
166 import org.simantics.utils.DataContainer;
167 import org.simantics.utils.Development;
168 import org.simantics.utils.threads.logger.ITask;
169 import org.simantics.utils.threads.logger.ThreadLogger;
170 import org.slf4j.LoggerFactory;
171
172 import gnu.trove.procedure.TLongProcedure;
173 import gnu.trove.set.hash.TLongHashSet;
174
175
176 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
177
178     private static org.slf4j.Logger LOGGER = LoggerFactory.getLogger(SessionImplSocket.class);
179
180     protected static final boolean DEBUG        = false;
181
182     private static final boolean DIAGNOSTICS       = false;
183
184     private TransactionPolicySupport transactionPolicy;
185
186     final private ClusterControl clusterControl;
187
188     final protected BuiltinSupportImpl builtinSupport;
189     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
190     final protected ClusterSetsSupport clusterSetsSupport;
191     final protected LifecycleSupportImpl lifecycleSupport;
192
193     protected QuerySupportImpl querySupport;
194     protected ResourceSupportImpl resourceSupport;
195     protected WriteSupport writeSupport;
196     public ClusterTranslator clusterTranslator;
197
198     boolean dirtyPrimitives = false;
199
200     public static final int SERVICE_MODE_CREATE = 2;
201     public static final int SERVICE_MODE_ALLOW = 1;
202     public int serviceMode = 0;
203     public boolean createdImmutableClusters = false;
204     public TLongHashSet createdClusters = new TLongHashSet();
205
206     private Layer0 L0;
207
208     /**
209      * The service locator maintained by the workbench. These services are
210      * initialized during workbench during the <code>init</code> method.
211      */
212     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
213
214     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
215
216     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
217
218     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
219
220     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
221
222     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
223
224     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
225
226     final protected State                                        state              = new State();
227
228     protected GraphSession                                       graphSession       = null;
229
230     protected SessionManager                                     sessionManagerImpl = null;
231
232     protected UserAuthenticationAgent                            authAgent          = null;
233
234     protected UserAuthenticator                                  authenticator      = null;
235
236     protected Resource                                           user               = null;
237
238     protected ClusterStream                                      clusterStream      = null;
239
240     protected SessionRequestManager                         requestManager = null;
241
242     public ClusterTable                                       clusterTable       = null;
243
244     public QueryProcessor                                     queryProvider2 = null;
245
246     ClientChangesImpl                                  clientChanges      = null;
247
248     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
249
250 //    protected long                                               newClusterId       = Constants.NullClusterId;
251
252     protected int                                                flushCounter       = 0;
253
254     protected boolean                                            writeOnly          = false;
255
256     WriteState<?>                                                writeState = null;
257     WriteStateBase<?>                                            delayedWriteState = null;
258     protected Resource defaultClusterSet = null; // If not null then used for newResource().
259
260     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
261
262 //        if (authAgent == null)
263 //            throw new IllegalArgumentException("null authentication agent");
264
265         File t = StaticSessionProperties.virtualGraphStoragePath;
266         if (null == t)
267             t = new File(".");
268         this.clusterTable = new ClusterTable(this, t);
269         this.builtinSupport = new BuiltinSupportImpl(this);
270         this.sessionManagerImpl = sessionManagerImpl;
271         this.user = null;
272 //        this.authAgent = authAgent;
273
274         serviceLocator.registerService(Session.class, this);
275         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
276         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
277         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
278         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
279         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
280         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
281         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
282         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
283         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
284         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
285         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
286         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
287         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
288         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
289         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
290         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
291         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
292         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
293         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
294         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
295         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
296         serviceLocator.registerService(EventSupport.class, new EventSupportImpl());
297         ServiceActivityUpdaterForWriteTransactions.register(this);
298
299         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
300         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
301         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
302         this.lifecycleSupport = new LifecycleSupportImpl(this);
303         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
304         this.transactionPolicy = new TransactionPolicyRelease();
305         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
306         this.clusterControl = new ClusterControlImpl(this);
307         serviceLocator.registerService(ClusterControl.class, clusterControl);
308         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
309         this.clusterSetsSupport.setReadDirectory(t.toPath());
310         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
311         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
312     }
313
314     /*
315      *
316      * Session interface
317      */
318
319
320     @Override
321     public Resource getRootLibrary() {
322         return queryProvider2.getRootLibraryResource();
323     }
324
325     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
326         if (!graphSession.dbSession.refreshEnabled())
327             return;
328         try {
329             getClusterTable().refresh(csid, this, clusterUID);
330         } catch (Throwable t) {
331             Logger.defaultLogError("Refesh failed.", t);
332         }
333     }
334
335     static final class TaskHelper {
336         private final String name;
337         private Object result;
338         final Semaphore sema = new Semaphore(0);
339         private Throwable throwable = null;
340         final Consumer<DatabaseException> callback = e -> {
341             synchronized (TaskHelper.this) {
342                 throwable = e;
343             }
344         };
345         final Procedure<Object> proc = new Procedure<Object>() {
346             @Override
347             public void execute(Object result) {
348                 callback.accept(null);
349             }
350             @Override
351             public void exception(Throwable t) {
352                 if (t instanceof DatabaseException)
353                     callback.accept((DatabaseException)t);
354                 else
355                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
356             }
357         };
358         final WriteTraits writeTraits = new WriteTraits() {};
359         TaskHelper(String name) {
360             this.name = name;
361         }
362         @SuppressWarnings("unchecked")
363         <T> T getResult() {
364             return (T)result;
365         }
366         void setResult(Object result) {
367             this.result = result;
368         }
369 //        Throwable throwableGet() {
370 //            return throwable;
371 //        }
372         synchronized void throwableSet(Throwable t) {
373             throwable = t;
374         }
375 //        void throwableSet(String t) {
376 //            throwable = new InternalException("" + name + " operation failed. " + t);
377 //        }
378         synchronized void throwableCheck()
379         throws DatabaseException {
380             if (null != throwable)
381                 if (throwable instanceof DatabaseException)
382                     throw (DatabaseException)throwable;
383                 else
384                     throw new DatabaseException("Undo operation failed.", throwable);
385         }
386         void throw_(String message)
387         throws DatabaseException {
388             throw new DatabaseException("" + name + " operation failed. " + message);
389         }
390     }
391
392
393
394     private ListenerBase getListenerBase(Object procedure) {
395         if (procedure instanceof ListenerBase)
396             return (ListenerBase) procedure;
397         else
398             return null;
399     }
400
401     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
402         scheduleRequest(request, callback, notify, null);
403     }
404
405     /* (non-Javadoc)
406      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
407      */
408     @Override
409     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
410
411         assert (request != null);
412
413         if(Development.DEVELOPMENT) {
414             try {
415             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
416                 System.err.println("schedule write '" + request + "'");
417             } catch (Throwable t) {
418                 Logger.defaultLogError(t);
419             }
420         }
421
422         requestManager.scheduleWrite(new SessionTask(null) {
423
424             @Override
425             public void run0(int thread) {
426
427                 if(Development.DEVELOPMENT) {
428                     try {
429                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
430                         System.err.println("perform write '" + request + "'");
431                     } catch (Throwable t) {
432                         Logger.defaultLogError(t);
433                     }
434                 }
435
436                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
437
438                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
439
440                 try {
441
442                     flushCounter = 0;
443                     Disposable.safeDispose(clientChanges);
444                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
445
446                     VirtualGraph vg = getProvider(request.getProvider());
447
448                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
449                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
450
451                         @Override
452                         public void execute(Object result) {
453                             if(callback != null) callback.accept(null);
454                         }
455
456                         @Override
457                         public void exception(Throwable t) {
458                             if(callback != null) callback.accept((DatabaseException)t);
459                         }
460
461                     });
462
463                     assert (null != writer);
464                     writer.asyncBarrier.inc();
465
466                     try {
467                         request.perform(writer);
468                         assert (null != writer);
469                     } catch (Throwable t) {
470                         if (!(t instanceof CancelTransactionException))
471                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
472                         writeState.except(t);
473                     } finally {
474                         writer.asyncBarrier.dec();
475 //                        writer.waitAsync(request);
476                     }
477
478
479                     assert(!queryProvider2.cache.dirty);
480
481                 } catch (Throwable e) {
482
483                     // Log it first, just to be safe that the error is always logged.
484                     if (!(e instanceof CancelTransactionException))
485                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
486
487 //                    writeState.getGraph().state.barrier.dec();
488 //                    writeState.getGraph().waitAsync(request);
489
490                     writeState.except(e);
491
492 //                    try {
493 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
494 //                        // All we can do here is to log those, can't really pass them anywhere.
495 //                        if (callback != null) {
496 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
497 //                            else callback.run(new DatabaseException(e));
498 //                        }
499 //                    } catch (Throwable e2) {
500 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
501 //                    }
502 //                    boolean empty = clusterStream.reallyFlush();
503 //                    int callerThread = -1;
504 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
505 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
506 //                    try {
507 //                        // Send all local changes to server so it can calculate correct reverse change set.
508 //                        // This will call clusterStream.accept().
509 //                        state.cancelCommit(context, clusterStream);
510 //                        if (!empty) {
511 //                            if (!context.isOk()) // this is a blocking operation
512 //                                throw new InternalException("Cancel failed. This should never happen.");
513 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
514 //                        }
515 //                        state.cancelCommit2(context, clusterStream);
516 //                    } catch (DatabaseException e3) {
517 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
518 //                    } finally {
519 //                        clusterStream.setOff(false);
520 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
521 //                    }
522
523                 } finally {
524
525                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
526
527                 }
528
529                 task.finish();
530
531                 if(Development.DEVELOPMENT) {
532                     try {
533                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
534                         System.err.println("finish write '" + request + "'");
535                     } catch (Throwable t) {
536                         Logger.defaultLogError(t);
537                     }
538                 }
539
540             }
541
542         }, combine);
543
544     }
545
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
547         scheduleRequest(request, procedure, notify, null);
548     }
549
550     /* (non-Javadoc)
551      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
552      */
553     @Override
554     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
555
556         assert (request != null);
557
558         requestManager.scheduleWrite(new SessionTask(null) {
559
560             @Override
561             public void run0(int thread) {
562
563                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
564
565                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
566
567                 flushCounter = 0;
568                 Disposable.safeDispose(clientChanges);
569                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
570
571                 VirtualGraph vg = getProvider(request.getProvider());
572                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
573
574                 WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
575                 writeState = writeStateT;
576                 assert (null != writer);
577
578                 try {
579                     writer.asyncBarrier.inc();
580                     writeStateT.setResult(request.perform(writer));
581                     assert (null != writer);
582                 } catch (Throwable e) {
583                     writeState.except(e);
584                 } finally {
585                     writer.asyncBarrier.dec();
586                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
587                 }
588
589                 task.finish();
590
591             }
592
593         }, combine);
594
595     }
596
597     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
598         scheduleRequest(request, callback, notify, null);
599     }
600
601     /* (non-Javadoc)
602      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
603      */
604     @Override
605     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
606
607         final ITask total = ThreadLogger.task("ScheduleDelayedWrite");
608
609         assert (request != null);
610
611         requestManager.scheduleWrite(new SessionTask(null) {
612
613             @Override
614             public void run0(int thread) {
615                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
616
617                 Procedure<Object> stateProcedure = new Procedure<Object>() {
618                     @Override
619                     public void execute(Object result) {
620                         if (callback != null)
621                             callback.accept(null);
622                     }
623                     @Override
624                     public void exception(Throwable t) {
625                         if (callback != null) {
626                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
627                             else callback.accept(new DatabaseException(t));
628                         } else
629                             Logger.defaultLogError("Unhandled exception", t);
630                     }
631                 };
632
633                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
634                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
635                 DelayedWriteGraph dwg = null;
636 //                newGraph.state.barrier.inc();
637
638                 try {
639                     dwg = new DelayedWriteGraph(newGraph);
640                     request.perform(dwg);
641                 } catch (Throwable e) {
642                     delayedWriteState.except(e);
643                     total.finish();
644                     dwg.close();
645                     return;
646                 } finally {
647 //                    newGraph.state.barrier.dec();
648 //                    newGraph.waitAsync(request);
649                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
650                 }
651
652                 delayedWriteState = null;
653
654                 ITask task2 = ThreadLogger.task("DelayedWriteCommit");
655                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
656
657                 flushCounter = 0;
658                 Disposable.safeDispose(clientChanges);
659                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
660
661                 acquireWriteOnly();
662
663                 VirtualGraph vg = getProvider(request.getProvider());
664                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
665
666                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
667
668                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
669
670                 assert (null != writer);
671 //                writer.state.barrier.inc();
672
673                 try {
674                     // Cannot cancel
675                     dwg.commit(writer, request);
676
677                         if(defaultClusterSet != null) {
678                                 XSupport xs = getService(XSupport.class);
679                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
680                                 ClusteringSupport cs = getService(ClusteringSupport.class);
681                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
682                         }
683
684                     // This makes clusters available from server
685                     clusterStream.reallyFlush();
686
687                     releaseWriteOnly(writer);
688                     handleUpdatesAndMetadata(writer);
689
690                 } catch (ServiceException e) {
691 //                    writer.state.barrier.dec();
692 //                    writer.waitAsync(null);
693
694                     // These shall be requested from server
695                     clusterTable.removeWriteOnlyClusters();
696                     // This makes clusters available from server
697                     clusterStream.reallyFlush();
698
699                     releaseWriteOnly(writer);
700                     writeState.except(e);
701                 } finally {
702                     // Debugging & Profiling
703                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
704                     task2.finish();
705                     total.finish();
706                 }
707             }
708
709         }, combine);
710
711     }
712
713     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
714         scheduleRequest(request, procedure, notify, null);
715     }
716
717     /* (non-Javadoc)
718      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
719      */
720     @Override
721     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
722         throw new Error("Not implemented");
723     }
724
725     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
726         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
727         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
728             createdClusters.add(cluster.clusterId);
729         }
730         return cluster;
731     }
732
733     class WriteOnlySupport implements WriteSupport {
734
735         ClusterStream stream;
736         ClusterImpl currentCluster;
737
738         public WriteOnlySupport() {
739             this.stream = clusterStream;
740         }
741
742         @Override
743         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
744             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
745         }
746
747         @Override
748         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
749
750             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
751
752             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
753                 if(s != queryProvider2.getRootLibrary())
754                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
755
756             try {
757                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
758             } catch (DatabaseException e) {
759                 Logger.defaultLogError(e);
760                 //throw e;
761             }
762
763             clientChanges.invalidate(s);
764
765             if (cluster.isWriteOnly())
766                 return;
767             queryProvider2.updateStatements(s, p);
768
769         }
770
771         @Override
772         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
773             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
774         }
775
776         @Override
777         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
778
779             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
780             try {
781                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
782             } catch (DatabaseException e) {
783                 Logger.defaultLogError(e);
784             }
785
786             clientChanges.invalidate(rid);
787
788             if (cluster.isWriteOnly())
789                 return;
790             queryProvider2.updateValue(rid);
791
792         }
793
794         @Override
795         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
796
797             if(amount < 65536) {
798                 claimValue(provider,resource, reader.readBytes(null, amount));
799                 return;
800             }
801
802             byte[] bytes = new byte[65536];
803
804             int rid = ((ResourceImpl)resource).id;
805             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
806             try {
807                 int left = amount;
808                 while(left > 0) {
809                     int block = Math.min(left, 65536);
810                     reader.readBytes(bytes, block);
811                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
812                     left -= block;
813                 }
814             } catch (DatabaseException e) {
815                 Logger.defaultLogError(e);
816             }
817
818             clientChanges.invalidate(rid);
819
820             if (cluster.isWriteOnly())
821                 return;
822             queryProvider2.updateValue(rid);
823
824         }
825
826         private void maintainCluster(ClusterImpl before, ClusterI after_) {
827             if(after_ != null && after_ != before) {
828                 ClusterImpl after = (ClusterImpl)after_;
829                 if(currentCluster == before) {
830                     currentCluster = after;
831                 }
832                 clusterTable.replaceCluster(after);
833             }
834         }
835
836         public int createResourceKey(int foreignCounter) throws DatabaseException {
837             if(currentCluster == null) {
838                 currentCluster = getNewResourceCluster();
839             }
840             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
841                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
842                 newCluster.foreignLookup = new byte[foreignCounter];
843                 currentCluster = newCluster;
844                 if (DEBUG)
845                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
846             }
847             return currentCluster.createResource(clusterTranslator);
848         }
849
850         @Override
851         public Resource createResource(VirtualGraph provider) throws DatabaseException {
852             if(currentCluster == null) {
853                 if (null != defaultClusterSet) {
854                         ResourceImpl result = getNewResource(defaultClusterSet);
855                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
856                     return result;
857                 } else {
858                     currentCluster = getNewResourceCluster();
859                 }
860             }
861             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
862                 if (null != defaultClusterSet) {
863                         ResourceImpl result = getNewResource(defaultClusterSet);
864                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
865                     return result;
866                 } else {
867                     currentCluster = getNewResourceCluster();
868                 }
869             }
870             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
871         }
872
873         @Override
874         public Resource createResource(VirtualGraph provider, long clusterId)
875         throws DatabaseException {
876             return getNewResource(clusterId);
877         }
878
879         @Override
880         public Resource createResource(VirtualGraph provider, Resource clusterSet)
881         throws DatabaseException {
882             return getNewResource(clusterSet);
883         }
884
885         @Override
886         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
887                 throws DatabaseException {
888             getNewClusterSet(clusterSet);
889         }
890
891         @Override
892         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
893         throws ServiceException {
894             return containsClusterSet(clusterSet);
895         }
896
897         public void selectCluster(long cluster) {
898                 currentCluster = clusterTable.getClusterByClusterId(cluster);
899                 long setResourceId = clusterSetsSupport.getSet(cluster);
900                 clusterSetsSupport.put(setResourceId, cluster);
901         }
902
903         @Override
904         public Resource setDefaultClusterSet(Resource clusterSet)
905         throws ServiceException {
906                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
907                 if(clusterSet != null) {
908                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
909                         currentCluster = clusterTable.getClusterByClusterId(id);
910                         return result;
911                 } else {
912                         currentCluster = null;
913                         return null;
914                 }
915         }
916
917         @Override
918         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
919
920                  provider = getProvider(provider);
921              if (null == provider) {
922                  int key = ((ResourceImpl)resource).id;
923
924
925
926                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
927 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
928 //                      if(key != queryProvider2.getRootLibrary())
929 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
930
931 //                 try {
932 //                     cluster.removeValue(key, clusterTranslator);
933 //                 } catch (DatabaseException e) {
934 //                     Logger.defaultLogError(e);
935 //                     return;
936 //                 }
937
938                  clusterTable.writeOnlyInvalidate(cluster);
939
940                  try {
941                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
942                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
943                          clusterTranslator.removeValue(cluster);
944                  } catch (DatabaseException e) {
945                          Logger.defaultLogError(e);
946                  }
947
948                  queryProvider2.invalidateResource(key);
949                  clientChanges.invalidate(key);
950
951              } else {
952                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
953                  queryProvider2.updateValue(querySupport.getId(resource));
954                  clientChanges.claimValue(resource);
955              }
956
957
958         }
959
960         @Override
961         public void flush(boolean intermediate) {
962             throw new UnsupportedOperationException();
963         }
964
965         @Override
966         public void flushCluster() {
967             clusterTable.flushCluster(graphSession);
968             if(defaultClusterSet != null) {
969                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
970             }
971             currentCluster = null;
972         }
973
974         @Override
975         public void flushCluster(Resource r) {
976             throw new UnsupportedOperationException("flushCluster resource " + r);
977         }
978
979         @Override
980         public void gc() {
981         }
982
983         @Override
984         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
985                 Resource object) {
986
987                 int s = ((ResourceImpl)subject).id;
988                 int p = ((ResourceImpl)predicate).id;
989                 int o = ((ResourceImpl)object).id;
990
991                 provider = getProvider(provider);
992                 if (null == provider) {
993
994                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
995                         clusterTable.writeOnlyInvalidate(cluster);
996
997                         try {
998
999                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1000
1001                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1002                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1003
1004                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1005                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1006                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1007                                 clusterTranslator.removeStatement(cluster);
1008
1009                                 queryProvider2.invalidateResource(s);
1010                                 clientChanges.invalidate(s);
1011
1012                         } catch (DatabaseException e) {
1013
1014                                 Logger.defaultLogError(e);
1015
1016                         }
1017
1018                         return true;
1019
1020                 } else {
1021                         
1022                         ((VirtualGraphImpl)provider).deny(s, p, o);
1023                         queryProvider2.invalidateResource(s);
1024                         clientChanges.invalidate(s);
1025
1026                         return true;
1027
1028                 }
1029
1030         }
1031
1032         @Override
1033         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1034             throw new UnsupportedOperationException();
1035         }
1036
1037         @Override
1038         public boolean writeOnly() {
1039             return true;
1040         }
1041
1042         @Override
1043         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1044             writeSupport.performWriteRequest(graph, request);
1045         }
1046
1047         @Override
1048         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1049             throw new UnsupportedOperationException();
1050         }
1051
1052         @Override
1053         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1054             throw new UnsupportedOperationException();
1055         }
1056
1057         @Override
1058         public <T> void addMetadata(Metadata data) throws ServiceException {
1059             writeSupport.addMetadata(data);
1060         }
1061
1062         @Override
1063         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1064             return writeSupport.getMetadata(clazz);
1065         }
1066
1067         @Override
1068         public TreeMap<String, byte[]> getMetadata() {
1069             return writeSupport.getMetadata();
1070         }
1071
1072         @Override
1073         public void commitDone(WriteTraits writeTraits, long csid) {
1074             writeSupport.commitDone(writeTraits, csid);
1075         }
1076
1077         @Override
1078         public void clearUndoList(WriteTraits writeTraits) {
1079             writeSupport.clearUndoList(writeTraits);
1080         }
1081         @Override
1082         public int clearMetadata() {
1083             return writeSupport.clearMetadata();
1084         }
1085
1086                 @Override
1087                 public void startUndo() {
1088                         writeSupport.startUndo();
1089                 }
1090     }
1091
1092     class VirtualWriteOnlySupport implements WriteSupport {
1093
1094 //        @Override
1095 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1096 //                Resource object) {
1097 //            throw new UnsupportedOperationException();
1098 //        }
1099
1100         @Override
1101         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1102
1103             TransientGraph impl = (TransientGraph)provider;
1104             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1105             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1106             clientChanges.claim(subject, predicate, object);
1107
1108         }
1109
1110         @Override
1111         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1112
1113             TransientGraph impl = (TransientGraph)provider;
1114             impl.claim(subject, predicate, object);
1115             getQueryProvider2().updateStatements(subject, predicate);
1116             clientChanges.claim(subject, predicate, object);
1117
1118         }
1119
1120         @Override
1121         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1122             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1123         }
1124
1125         @Override
1126         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1127             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1128             getQueryProvider2().updateValue(resource);
1129             clientChanges.claimValue(resource);
1130         }
1131
1132         @Override
1133         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1134             byte[] value = reader.readBytes(null, amount);
1135             claimValue(provider, resource, value);
1136         }
1137
1138         @Override
1139         public Resource createResource(VirtualGraph provider) {
1140             TransientGraph impl = (TransientGraph)provider;
1141             return impl.getResource(impl.newResource(false));
1142         }
1143
1144         @Override
1145         public Resource createResource(VirtualGraph provider, long clusterId) {
1146             throw new UnsupportedOperationException();
1147         }
1148
1149         @Override
1150         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1151         throws DatabaseException {
1152             throw new UnsupportedOperationException();
1153         }
1154
1155         @Override
1156         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1157         throws DatabaseException {
1158             throw new UnsupportedOperationException();
1159         }
1160
1161         @Override
1162         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1163         throws ServiceException {
1164             throw new UnsupportedOperationException();
1165         }
1166
1167         @Override
1168         public Resource setDefaultClusterSet(Resource clusterSet)
1169         throws ServiceException {
1170                 return null;
1171         }
1172
1173         @Override
1174         public void denyValue(VirtualGraph provider, Resource resource) {
1175             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1176             getQueryProvider2().updateValue(querySupport.getId(resource));
1177             // NOTE: this only keeps track of value changes by-resource.
1178             clientChanges.claimValue(resource);
1179         }
1180
1181         @Override
1182         public void flush(boolean intermediate) {
1183             throw new UnsupportedOperationException();
1184         }
1185
1186         @Override
1187         public void flushCluster() {
1188             throw new UnsupportedOperationException();
1189         }
1190
1191         @Override
1192         public void flushCluster(Resource r) {
1193             throw new UnsupportedOperationException("Resource " + r);
1194         }
1195
1196         @Override
1197         public void gc() {
1198         }
1199
1200         @Override
1201         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1202             TransientGraph impl = (TransientGraph) provider;
1203             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1204             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1205             clientChanges.deny(subject, predicate, object);
1206             return true;
1207         }
1208
1209         @Override
1210         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1211             throw new UnsupportedOperationException();
1212         }
1213
1214         @Override
1215         public boolean writeOnly() {
1216             return true;
1217         }
1218
1219         @Override
1220         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1221             throw new UnsupportedOperationException();
1222         }
1223
1224         @Override
1225         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1226             throw new UnsupportedOperationException();
1227         }
1228
1229         @Override
1230         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1231             throw new UnsupportedOperationException();
1232         }
1233
1234         @Override
1235         public <T> void addMetadata(Metadata data) throws ServiceException {
1236             throw new UnsupportedOperationException();
1237         }
1238
1239         @Override
1240         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1241             throw new UnsupportedOperationException();
1242         }
1243
1244         @Override
1245         public TreeMap<String, byte[]> getMetadata() {
1246             throw new UnsupportedOperationException();
1247         }
1248
1249         @Override
1250         public void commitDone(WriteTraits writeTraits, long csid) {
1251         }
1252
1253         @Override
1254         public void clearUndoList(WriteTraits writeTraits) {
1255         }
1256
1257         @Override
1258         public int clearMetadata() {
1259             return 0;
1260         }
1261
1262                 @Override
1263                 public void startUndo() {
1264                 }
1265     }
1266
1267     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1268
1269         try {
1270
1271             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1272
1273             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1274
1275             flushCounter = 0;
1276             Disposable.safeDispose(clientChanges);
1277             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1278
1279             acquireWriteOnly();
1280
1281             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1282
1283             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1284
1285             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1286             writeState = writeStateT;
1287
1288             assert (null != writer);
1289 //            writer.state.barrier.inc();
1290             long start = System.nanoTime();
1291             T result = request.perform(writer);
1292             long duration = System.nanoTime() - start;
1293             if (DEBUG) {
1294                 System.err.println("################");
1295                 System.err.println("WriteOnly duration " + 1e-9*duration);
1296             }
1297             writeStateT.setResult(result);
1298
1299             // This makes clusters available from server
1300             clusterStream.reallyFlush();
1301
1302             // This will trigger query updates
1303             releaseWriteOnly(writer);
1304             assert (null != writer);
1305
1306             handleUpdatesAndMetadata(writer);
1307
1308         } catch (CancelTransactionException e) {
1309
1310             releaseWriteOnly(writeState.getGraph());
1311
1312             clusterTable.removeWriteOnlyClusters();
1313             state.stopWriteTransaction(clusterStream);
1314
1315         } catch (Throwable e) {
1316
1317             e.printStackTrace();
1318
1319             releaseWriteOnly(writeState.getGraph());
1320
1321             clusterTable.removeWriteOnlyClusters();
1322
1323             if (callback != null)
1324                 callback.exception(new DatabaseException(e));
1325
1326             state.stopWriteTransaction(clusterStream);
1327             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1328
1329         } finally  {
1330
1331             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1332
1333         }
1334
1335     }
1336
1337     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1338         scheduleRequest(request, callback, notify, null);
1339     }
1340
1341     @Override
1342     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1343
1344         assertAlive();
1345
1346         assert (request != null);
1347
1348         requestManager.scheduleWrite(new SessionTask(null) {
1349
1350             @Override
1351             public void run0(int thread) {
1352
1353                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1354
1355                     try {
1356
1357                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1358
1359                         flushCounter = 0;
1360                         Disposable.safeDispose(clientChanges);
1361                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1362
1363                         acquireWriteOnly();
1364
1365                         VirtualGraph vg = getProvider(request.getProvider());
1366                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1367
1368                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1369
1370                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1371
1372                             @Override
1373                             public void execute(Object result) {
1374                                 if(callback != null) callback.accept(null);
1375                             }
1376
1377                             @Override
1378                             public void exception(Throwable t) {
1379                                 if(callback != null) callback.accept((DatabaseException)t);
1380                             }
1381
1382                         });
1383
1384                         assert (null != writer);
1385 //                        writer.state.barrier.inc();
1386
1387                         try {
1388
1389                             request.perform(writer);
1390
1391                         } catch (Throwable e) {
1392
1393 //                            writer.state.barrier.dec();
1394 //                            writer.waitAsync(null);
1395
1396                             releaseWriteOnly(writer);
1397
1398                             clusterTable.removeWriteOnlyClusters();
1399
1400                             if(!(e instanceof CancelTransactionException)) {
1401                                 if (callback != null)
1402                                     callback.accept(new DatabaseException(e));
1403                             }
1404
1405                             writeState.except(e);
1406
1407                             return;
1408
1409                         }
1410
1411                         // This makes clusters available from server
1412                         boolean empty = clusterStream.reallyFlush();
1413                         // This was needed to make WO requests call metadata listeners.
1414                         // NOTE: the calling event does not contain clientChanges information.
1415                         if (!empty && clientChanges.isEmpty())
1416                             clientChanges.setNotEmpty(true);
1417                         releaseWriteOnly(writer);
1418                         assert (null != writer);
1419                     } finally  {
1420
1421                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1422
1423                     }
1424
1425
1426                 task.finish();
1427
1428             }
1429
1430         }, combine);
1431
1432     }
1433
1434     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1435         scheduleRequest(request, callback, notify, null);
1436     }
1437
1438     /* (non-Javadoc)
1439      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1440      */
1441     @Override
1442     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1443
1444         assert (request != null);
1445
1446         requestManager.scheduleWrite(new SessionTask(null) {
1447
1448             @Override
1449             public void run0(int thread) {
1450
1451                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1452
1453                 performWriteOnly(request, notify, callback);
1454
1455                 task.finish();
1456
1457             }
1458
1459         }, combine);
1460
1461     }
1462
1463     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1464
1465         assert (request != null);
1466         assert (procedure != null);
1467
1468         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1469
1470             @Override
1471             public void run0(int thread) {
1472
1473                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1474
1475                 ITask task = ThreadLogger.task(request);
1476
1477                 ListenerBase listener = getListenerBase(procedure);
1478
1479                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1480
1481                 // This is never synced but increase to prevent it from visiting 0
1482                 newGraph.asyncBarrier.inc();
1483
1484                 try {
1485
1486                     if (listener != null) {
1487
1488                         try {
1489                                 
1490                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1491
1492                                         @Override
1493                                         public void exception(AsyncReadGraph graph, Throwable t) {
1494                                                 procedure.exception(graph, t);
1495                                                 if(throwable != null) {
1496                                                         throwable.set(t);
1497                                                 } else {
1498                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1499                                                 }
1500                                         }
1501
1502                                         @Override
1503                                         public void execute(AsyncReadGraph graph, T t) {
1504                                                 if(result != null) result.set(t);
1505                                                 procedure.execute(graph, t);
1506                                         }
1507
1508                                 };
1509                                 
1510                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1511                                 
1512                         } catch (Throwable t) {
1513                             // This is handled by the AsyncProcedure
1514                                 //Logger.defaultLogError("Internal error", t);
1515                         }
1516
1517                     } else {
1518
1519                         try {
1520
1521                             T t = request.perform(newGraph);
1522
1523                             try {
1524
1525                                 if(result != null) result.set(t);
1526                                 procedure.execute(newGraph, t);
1527
1528                             } catch (Throwable th) {
1529
1530                                 if(throwable != null) {
1531                                     throwable.set(th);
1532                                 } else {
1533                                     Logger.defaultLogError("Unhandled exception", th);
1534                                 }
1535
1536                             }
1537
1538                         } catch (Throwable t) {
1539
1540                             if (DEBUG)
1541                                 t.printStackTrace();
1542
1543                             if(throwable != null) {
1544                                 throwable.set(t);
1545                             } else {
1546                                 Logger.defaultLogError("Unhandled exception", t);
1547                             }
1548
1549                             try {
1550
1551                                 procedure.exception(newGraph, t);
1552
1553                             } catch (Throwable t2) {
1554
1555                                 if(throwable != null) {
1556                                     throwable.set(t2);
1557                                 } else {
1558                                     Logger.defaultLogError("Unhandled exception", t2);
1559                                 }
1560
1561                             }
1562
1563                         }
1564                         
1565                         task.finish();
1566
1567                     }
1568
1569                 } finally {
1570
1571                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1572
1573                 }
1574
1575             }
1576
1577         });
1578
1579     }
1580
1581     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1582
1583         assert (request != null);
1584         assert (procedure != null);
1585
1586         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1587
1588         requestManager.scheduleRead(new SessionRead(null, notify) {
1589
1590             @Override
1591             public void run0(int thread) {
1592
1593                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1594
1595                 ITask task = ThreadLogger.task(request);
1596
1597                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1598                 newGraph.asyncBarrier.inc();
1599
1600                 try {
1601
1602                     if (listener != null) {
1603
1604                         try {
1605                             QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1606                         } catch (DatabaseException e) {
1607                             LOGGER.error("Unhandled query exception", e);
1608                         }
1609
1610                     } else {
1611
1612                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(newGraph, null, procedure, request, true) {
1613
1614                             public void execute(AsyncReadGraph graph_, T result) {
1615                                 task.finish();
1616                                 super.execute(graph_, result);
1617                             }
1618
1619                             public void exception(AsyncReadGraph graph_, Throwable t) {
1620                                 task.finish();
1621                                 super.exception(graph_, t);
1622                             }
1623
1624                         };
1625
1626                         try {
1627                             wrap.performSync(request);
1628                         } catch (DatabaseException e) {
1629                             LOGGER.error("Unhandled query exception", e);
1630                         }
1631
1632                     }
1633
1634                 } finally {
1635
1636                     newGraph.asyncBarrier.dec();
1637
1638                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1639
1640                 }
1641
1642             }
1643
1644         });
1645
1646     }
1647
1648     public <T> void scheduleRequest(final MultiRead<T> request, final SyncMultiProcedure<T> procedure, final Semaphore notify) {
1649
1650         assert (request != null);
1651         assert (procedure != null);
1652
1653         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1654
1655         int sync = notify != null ? thread : -1;
1656
1657         requestManager.scheduleRead(new SessionRead(null, notify) {
1658
1659             @Override
1660             public void run0(int thread) {
1661
1662                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1663
1664                 ListenerBase listener = getListenerBase(procedure);
1665
1666                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1667
1668                 try {
1669
1670                     if (listener != null) {
1671
1672                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1673
1674 //                        newGraph.waitAsync(request);
1675
1676                     } else {
1677
1678                         final ResultCallWrappedSyncQueryProcedure<T> wrapper = new ResultCallWrappedSyncQueryProcedure<T>(procedure);
1679
1680                         try {
1681
1682                             request.perform(newGraph, wrapper);
1683
1684                         } catch (Throwable t) {
1685
1686                             t.printStackTrace();
1687
1688                         }
1689
1690                     }
1691
1692                 } finally {
1693
1694                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1695
1696                 }
1697
1698             }
1699
1700         });
1701
1702     }
1703     
1704     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1705
1706         assert (request != null);
1707         assert (procedure != null);
1708
1709         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1710
1711         int sync = notify != null ? thread : -1;
1712
1713         requestManager.scheduleRead(new SessionRead(null, notify) {
1714
1715             @Override
1716             public void run0(int thread) {
1717
1718                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1719
1720                 ListenerBase listener = getListenerBase(procedure);
1721
1722                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1723
1724                 try {
1725
1726                     if (listener != null) {
1727
1728                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1729
1730 //                        newGraph.waitAsync(request);
1731
1732                     } else {
1733
1734                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1735
1736                         try {
1737
1738                             request.perform(newGraph, wrapper);
1739
1740                         } catch (Throwable t) {
1741
1742                             t.printStackTrace();
1743
1744                         }
1745
1746                     }
1747
1748                 } finally {
1749
1750                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1751
1752                 }
1753
1754             }
1755
1756         });
1757
1758     }
1759
1760     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1761
1762         assert (request != null);
1763         assert (procedure != null);
1764
1765         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1766
1767         requestManager.scheduleRead(new SessionRead(throwable, notify) {
1768
1769             @Override
1770             public void run0(int thread) {
1771
1772                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1773
1774                 ListenerBase listener = getListenerBase(procedure);
1775
1776                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1777
1778                 try {
1779
1780                     if (listener != null) {
1781
1782                         try {
1783                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1784                         } catch (DatabaseException e) {
1785                             Logger.defaultLogError(e);
1786                         }
1787
1788                     } else {
1789
1790 //                        newGraph.state.barrier.inc();
1791
1792                         request.register(newGraph, new Listener<T>() {
1793
1794                             @Override
1795                             public void exception(Throwable t) {
1796                                 if(throwable != null) throwable.set(t);
1797                                 procedure.exception(t);
1798 //                                newGraph.state.barrier.dec();
1799                             }
1800
1801                             @Override
1802                             public void execute(T t) {
1803                                 if(result != null) result.set(t);
1804                                 procedure.execute(t);
1805 //                                newGraph.state.barrier.dec();
1806                             }
1807
1808                             @Override
1809                             public boolean isDisposed() {
1810                                 return true;
1811                             }
1812
1813                         });
1814
1815 //                        newGraph.waitAsync(request);
1816
1817                     }
1818
1819                 } finally {
1820
1821                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1822
1823                 }
1824
1825             }
1826
1827         });
1828
1829     }
1830
1831
1832     @Override
1833     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1834
1835         scheduleRequest(request, procedure, null, null, null);
1836
1837     }
1838
1839     @Override
1840     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1841         assertNotSession();
1842         Semaphore notify = new Semaphore(0);
1843         DataContainer<Throwable> container = new DataContainer<Throwable>();
1844         DataContainer<T> result = new DataContainer<T>();
1845         scheduleRequest(request, procedure, notify, container, result);
1846         acquire(notify, request);
1847         Throwable throwable = container.get();
1848         if(throwable != null) {
1849             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1850             else throw new DatabaseException("Unexpected exception", throwable);
1851         }
1852         return result.get();
1853     }
1854
1855     @Override
1856     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1857         assertNotSession();
1858         Semaphore notify = new Semaphore(0);
1859         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1860         final DataContainer<T> resultContainer = new DataContainer<T>();
1861         scheduleRequest(request, new AsyncProcedure<T>() {
1862             @Override
1863             public void exception(AsyncReadGraph graph, Throwable throwable) {
1864                 exceptionContainer.set(throwable);
1865                 procedure.exception(graph, throwable);
1866             }
1867             @Override
1868             public void execute(AsyncReadGraph graph, T result) {
1869                 resultContainer.set(result);
1870                 procedure.execute(graph, result);
1871             }
1872         }, getListenerBase(procedure), notify);
1873         acquire(notify, request, procedure);
1874         Throwable throwable = exceptionContainer.get();
1875         if (throwable != null) {
1876             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1877             else throw new DatabaseException("Unexpected exception", throwable);
1878         }
1879         return resultContainer.get();
1880     }
1881
1882
1883     @Override
1884     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1885         assertNotSession();
1886         Semaphore notify = new Semaphore(0);
1887         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1888         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1889             @Override
1890             public void exception(AsyncReadGraph graph, Throwable throwable) {
1891                 exceptionContainer.set(throwable);
1892                 procedure.exception(graph, throwable);
1893             }
1894             @Override
1895             public void execute(AsyncReadGraph graph, T result) {
1896                 procedure.execute(graph, result);
1897             }
1898             @Override
1899             public void finished(AsyncReadGraph graph) {
1900                 procedure.finished(graph);
1901             }
1902         }, notify);
1903         acquire(notify, request, procedure);
1904         Throwable throwable = exceptionContainer.get();
1905         if (throwable != null) {
1906             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1907             else throw new DatabaseException("Unexpected exception", throwable);
1908         }
1909         // TODO: implement return value
1910         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1911         return null;
1912     }
1913
1914     @Override
1915     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1916         return syncRequest(request, new ProcedureAdapter<T>());
1917
1918
1919 //        assert(request != null);
1920 //
1921 //        final DataContainer<T> result = new DataContainer<T>();
1922 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1923 //
1924 //        syncRequest(request, new Procedure<T>() {
1925 //
1926 //            @Override
1927 //            public void execute(T t) {
1928 //                result.set(t);
1929 //            }
1930 //
1931 //            @Override
1932 //            public void exception(Throwable t) {
1933 //                exception.set(t);
1934 //            }
1935 //
1936 //        });
1937 //
1938 //        Throwable t = exception.get();
1939 //        if(t != null) {
1940 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1941 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1942 //        }
1943 //
1944 //        return result.get();
1945
1946     }
1947
1948     @Override
1949     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1950         return syncRequest(request, (Procedure<T>)procedure);
1951     }
1952
1953
1954 //    @Override
1955 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1956 //        assertNotSession();
1957 //        Semaphore notify = new Semaphore(0);
1958 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1959 //        DataContainer<T> result = new DataContainer<T>();
1960 //        scheduleRequest(request, procedure, notify, container, result);
1961 //        acquire(notify, request);
1962 //        Throwable throwable = container.get();
1963 //        if(throwable != null) {
1964 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1965 //            else throw new DatabaseException("Unexpected exception", throwable);
1966 //        }
1967 //        return result.get();
1968 //    }
1969
1970
1971     @Override
1972     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1973         assertNotSession();
1974         Semaphore notify = new Semaphore(0);
1975         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1976         final DataContainer<T> result = new DataContainer<T>();
1977         scheduleRequest(request, procedure, notify, container, result);
1978         acquire(notify, request);
1979         Throwable throwable = container.get();
1980         if (throwable != null) {
1981             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1982             else throw new DatabaseException("Unexpected exception", throwable);
1983         }
1984         return result.get();
1985     }
1986
1987     @Override
1988     public void syncRequest(Write request) throws DatabaseException  {
1989         assertNotSession();
1990         assertAlive();
1991         Semaphore notify = new Semaphore(0);
1992         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1993         scheduleRequest(request, e -> exception.set(e), notify);
1994         acquire(notify, request);
1995         if(exception.get() != null) throw exception.get();
1996     }
1997
1998     @Override
1999     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
2000         assertNotSession();
2001         Semaphore notify = new Semaphore(0);
2002         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2003         final DataContainer<T> result = new DataContainer<T>();
2004         scheduleRequest(request, new Procedure<T>() {
2005
2006             @Override
2007             public void exception(Throwable t) {
2008               exception.set(t);
2009             }
2010
2011             @Override
2012             public void execute(T t) {
2013               result.set(t);
2014             }
2015
2016         }, notify);
2017         acquire(notify, request);
2018         if(exception.get() != null) {
2019             Throwable t = exception.get();
2020             if(t instanceof DatabaseException) throw (DatabaseException)t;
2021             else throw new DatabaseException(t);
2022         }
2023         return result.get();
2024     }
2025
2026     @Override
2027     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2028         assertNotSession();
2029         Semaphore notify = new Semaphore(0);
2030         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2031         final DataContainer<T> result = new DataContainer<T>();
2032         scheduleRequest(request, new Procedure<T>() {
2033
2034             @Override
2035             public void exception(Throwable t) {
2036               exception.set(t);
2037             }
2038
2039             @Override
2040             public void execute(T t) {
2041               result.set(t);
2042             }
2043
2044         }, notify);
2045         acquire(notify, request);
2046         if(exception.get() != null) {
2047             Throwable t = exception.get();
2048             if(t instanceof DatabaseException) throw (DatabaseException)t;
2049             else throw new DatabaseException(t);
2050         }
2051         return result.get();
2052     }
2053
2054     @Override
2055     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2056         assertNotSession();
2057         Semaphore notify = new Semaphore(0);
2058         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2059         final DataContainer<T> result = new DataContainer<T>();
2060         scheduleRequest(request, new Procedure<T>() {
2061
2062             @Override
2063             public void exception(Throwable t) {
2064               exception.set(t);
2065             }
2066
2067             @Override
2068             public void execute(T t) {
2069               result.set(t);
2070             }
2071
2072         }, notify);
2073         acquire(notify, request);
2074         if(exception.get() != null) {
2075             Throwable t = exception.get();
2076             if(t instanceof DatabaseException) throw (DatabaseException)t;
2077             else throw new DatabaseException(t);
2078         }
2079         return result.get();
2080     }
2081
2082     @Override
2083     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2084         assertNotSession();
2085         Semaphore notify = new Semaphore(0);
2086         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2087         scheduleRequest(request, e -> exception.set(e), notify);
2088         acquire(notify, request);
2089         if(exception.get() != null) throw exception.get();
2090     }
2091
2092     @Override
2093     public void syncRequest(WriteOnly request) throws DatabaseException  {
2094         assertNotSession();
2095         assertAlive();
2096         Semaphore notify = new Semaphore(0);
2097         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2098         scheduleRequest(request, e -> exception.set(e), notify);
2099         acquire(notify, request);
2100         if(exception.get() != null) throw exception.get();
2101     }
2102
2103     /*
2104      *
2105      * GraphRequestProcessor interface
2106      */
2107
2108     @Override
2109     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2110
2111         scheduleRequest(request, procedure, null, null);
2112
2113     }
2114
2115     @Override
2116     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2117
2118         scheduleRequest(request, procedure, null);
2119
2120     }
2121
2122     @Override
2123     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2124
2125         scheduleRequest(request, procedure, null, null, null);
2126
2127     }
2128
2129     @Override
2130     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2131
2132         scheduleRequest(request, callback, null);
2133
2134     }
2135
2136     @Override
2137     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2138
2139         scheduleRequest(request, procedure, null);
2140
2141     }
2142
2143     @Override
2144     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2145
2146         scheduleRequest(request, procedure, null);
2147
2148     }
2149
2150     @Override
2151     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2152
2153         scheduleRequest(request, procedure, null);
2154
2155     }
2156
2157     @Override
2158     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2159
2160         scheduleRequest(request, callback, null);
2161
2162     }
2163
2164     @Override
2165     public void asyncRequest(final Write r) {
2166         asyncRequest(r, null);
2167     }
2168
2169     @Override
2170     public void asyncRequest(final DelayedWrite r) {
2171         asyncRequest(r, null);
2172     }
2173
2174     @Override
2175     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2176
2177         scheduleRequest(request, callback, null);
2178
2179     }
2180
2181     @Override
2182     public void asyncRequest(final WriteOnly request) {
2183
2184         asyncRequest(request, null);
2185
2186     }
2187
2188     @Override
2189     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2190         r.request(this, procedure);
2191     }
2192
2193     @Override
2194     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2195         r.request(this, procedure);
2196     }
2197
2198     @Override
2199     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2200         r.request(this, procedure);
2201     }
2202
2203     @Override
2204     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2205         r.request(this, procedure);
2206     }
2207
2208     @Override
2209     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2210         r.request(this, procedure);
2211     }
2212
2213     @Override
2214     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2215         r.request(this, procedure);
2216     }
2217
2218     @Override
2219     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2220         return r.request(this);
2221     }
2222
2223     @Override
2224     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2225         return r.request(this);
2226     }
2227
2228     @Override
2229     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2230         r.request(this, procedure);
2231     }
2232
2233     @Override
2234     public <T> void async(WriteInterface<T> r) {
2235         r.request(this, new ProcedureAdapter<T>());
2236     }
2237
2238     //@Override
2239     public void incAsync() {
2240         state.incAsync();
2241     }
2242
2243     //@Override
2244     public void decAsync() {
2245         state.decAsync();
2246     }
2247
2248     public long getCluster(ResourceImpl resource) {
2249         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2250         return cluster.getClusterId();
2251     }
2252
2253     public long getCluster(int id) {
2254         if (clusterTable == null)
2255             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2256         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2257     }
2258
2259     public ResourceImpl getResource(int id) {
2260         return new ResourceImpl(resourceSupport, id);
2261     }
2262
2263     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2264         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2265         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2266         int key = proxy.getClusterKey();
2267         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2268         return new ResourceImpl(resourceSupport, resourceKey);
2269     }
2270
2271     public ResourceImpl getResource2(int id) {
2272         assert (id != 0);
2273         return new ResourceImpl(resourceSupport, id);
2274     }
2275
2276     final public int getId(ResourceImpl impl) {
2277         return impl.id;
2278     }
2279
2280     public static final Charset UTF8 = Charset.forName("utf-8");
2281
2282
2283
2284
2285     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2286         for(TransientGraph g : support.providers) {
2287             if(g.isPending(subject)) return false;
2288         }
2289         return true;
2290     }
2291
2292     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2293         for(TransientGraph g : support.providers) {
2294             if(g.isPending(subject, predicate)) return false;
2295         }
2296         return true;
2297     }
2298
2299     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2300
2301         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2302
2303             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2304
2305             @Override
2306             public void accept(ReadGraphImpl graph) {
2307                 if(ready.decrementAndGet() == 0) {
2308                     runnable.accept(graph);
2309                 }
2310             }
2311
2312         };
2313
2314         for(TransientGraph g : support.providers) {
2315             if(g.isPending(subject)) {
2316                 try {
2317                     g.load(graph, subject, composite);
2318                 } catch (DatabaseException e) {
2319                     e.printStackTrace();
2320                 }
2321             } else {
2322                 composite.accept(graph);
2323             }
2324         }
2325
2326         composite.accept(graph);
2327
2328     }
2329
2330     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2331
2332         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2333
2334             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2335
2336             @Override
2337             public void accept(ReadGraphImpl graph) {
2338                 if(ready.decrementAndGet() == 0) {
2339                     runnable.accept(graph);
2340                 }
2341             }
2342
2343         };
2344
2345         for(TransientGraph g : support.providers) {
2346             if(g.isPending(subject, predicate)) {
2347                 try {
2348                     g.load(graph, subject, predicate, composite);
2349                 } catch (DatabaseException e) {
2350                     e.printStackTrace();
2351                 }
2352             } else {
2353                 composite.accept(graph);
2354             }
2355         }
2356
2357         composite.accept(graph);
2358
2359     }
2360
2361 //    void dumpHeap() {
2362 //
2363 //        try {
2364 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2365 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2366 //                    "d:/heap" + flushCounter + ".txt", true);
2367 //        } catch (IOException e) {
2368 //            e.printStackTrace();
2369 //        }
2370 //
2371 //    }
2372
2373     void fireReactionsToSynchronize(ChangeSet cs) {
2374
2375         // Do not fire empty events
2376         if (cs.isEmpty())
2377             return;
2378
2379         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2380 //        g.state.barrier.inc();
2381
2382         try {
2383
2384             if (!cs.isEmpty()) {
2385                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2386                 for (ChangeListener l : changeListeners2) {
2387                     try {
2388                         l.graphChanged(e2);
2389                     } catch (Exception ex) {
2390                         ex.printStackTrace();
2391                     }
2392                 }
2393             }
2394
2395         } finally {
2396
2397 //            g.state.barrier.dec();
2398 //            g.waitAsync(null);
2399
2400         }
2401
2402     }
2403
2404     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2405         try {
2406
2407             // Do not fire empty events
2408             if (cs2.isEmpty())
2409                 return;
2410
2411 //            graph.restart();
2412 //            graph.state.barrier.inc();
2413
2414             try {
2415
2416                 if (!cs2.isEmpty()) {
2417
2418                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2419                     for (ChangeListener l : changeListeners2) {
2420                         try {
2421                             l.graphChanged(e2);
2422 //                            System.out.println("changelistener " + l);
2423                         } catch (Exception ex) {
2424                             ex.printStackTrace();
2425                         }
2426                     }
2427
2428                 }
2429
2430             } finally {
2431
2432 //                graph.state.barrier.dec();
2433 //                graph.waitAsync(null);
2434
2435             }
2436         } catch (Throwable t) {
2437             t.printStackTrace();
2438
2439         }
2440     }
2441     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2442
2443         try {
2444
2445             // Do not fire empty events
2446             if (cs2.isEmpty())
2447                 return;
2448
2449             // Do not fire on virtual requests
2450             if(graph.getProvider() != null)
2451                 return;
2452
2453             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2454             reactionGraph.asyncBarrier.inc();
2455
2456             try {
2457
2458                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2459                 for (ChangeListener l : metadataListeners) {
2460                     try {
2461                         l.graphChanged(e2);
2462                     } catch (Throwable ex) {
2463                         LOGGER.error("Could not invoke listener {} with event {}", l, e2, ex);
2464                     }
2465                 }
2466
2467             } finally {
2468
2469                 reactionGraph.asyncBarrier.dec();
2470
2471             }
2472
2473         } catch (Throwable t) {
2474             LOGGER.error("Could not fire metadata listeners {} {}", graph, cs2, t);
2475         }
2476
2477     }
2478
2479
2480     /*
2481      * (non-Javadoc)
2482      *
2483      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2484      */
2485     @Override
2486     public <T> T getService(Class<T> api) {
2487         T t = peekService(api);
2488         if (t == null) {
2489             if (state.isClosed())
2490                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2491             throw new ServiceNotFoundException(this, api);
2492         }
2493         return t;
2494     }
2495
2496     /**
2497      * @return
2498      */
2499     protected abstract ServerInformation getCachedServerInformation();
2500
2501         private Class<?> serviceKey1 = null;
2502         private Class<?> serviceKey2 = null;
2503         private Object service1 = null;
2504         private Object service2 = null;
2505
2506     /*
2507      * (non-Javadoc)
2508      *
2509      * @see
2510      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2511      */
2512     @SuppressWarnings("unchecked")
2513     @Override
2514     public synchronized <T> T peekService(Class<T> api) {
2515
2516                 if(serviceKey1 == api) {
2517                         return (T)service1;
2518                 } else if (serviceKey2 == api) {
2519                         // Promote this key
2520                         Object result = service2;
2521                         service2 = service1;
2522                         serviceKey2 = serviceKey1;
2523                         service1 = result;
2524                         serviceKey1 = api;
2525                         return (T)result;
2526                 }
2527
2528         if (Layer0.class == api)
2529                 return (T) L0;
2530         if (ServerInformation.class == api)
2531             return (T) getCachedServerInformation();
2532         else if (WriteGraphImpl.class == api)
2533             return (T) writeState.getGraph();
2534         else if (ClusterBuilder.class == api)
2535             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2536         else if (ClusterBuilderFactory.class == api)
2537             return (T)new ClusterBuilderFactoryImpl(this);
2538
2539                 service2 = service1;
2540                 serviceKey2 = serviceKey1;
2541
2542         service1 = serviceLocator.peekService(api);
2543         serviceKey1 = api;
2544
2545         return (T)service1;
2546
2547     }
2548
2549     /*
2550      * (non-Javadoc)
2551      *
2552      * @see
2553      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2554      */
2555     @Override
2556     public boolean hasService(Class<?> api) {
2557         return serviceLocator.hasService(api);
2558     }
2559
2560     /**
2561      * @param api the api that must be implemented by the specified service
2562      * @param service the service implementation
2563      */
2564     @Override
2565     public <T> void registerService(Class<T> api, T service) {
2566         if(Layer0.class == api) {
2567                 L0 = (Layer0)service;
2568                 return;
2569         }
2570         serviceLocator.registerService(api, service);
2571         if (TransactionPolicySupport.class == api) {
2572             transactionPolicy = (TransactionPolicySupport)service;
2573             state.resetTransactionPolicy();
2574         }
2575         if (api == serviceKey1)
2576                 service1 = service;
2577         else if (api == serviceKey2)
2578                 service2 = service;
2579     }
2580
2581     // ----------------
2582     // SessionMonitor
2583     // ----------------
2584
2585     void fireSessionVariableChange(String variable) {
2586         for (MonitorHandler h : monitorHandlers) {
2587             MonitorContext ctx = monitorContexts.getLeft(h);
2588             assert ctx != null;
2589             // SafeRunner functionality repeated here to avoid dependency.
2590             try {
2591                 h.valuesChanged(ctx);
2592             } catch (Exception e) {
2593                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2594             } catch (LinkageError e) {
2595                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2596             }
2597         }
2598     }
2599
2600
2601     class ResourceSerializerImpl implements ResourceSerializer {
2602
2603         public long createRandomAccessId(int id) throws DatabaseException {
2604             if(id < 0)
2605                 return id;
2606             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2607             long cluster = getCluster(id);
2608             if (0 == cluster)
2609                 return 0; // Better to return 0 then invalid id.
2610             long result = ClusterTraitsBase.createResourceId(cluster, index);
2611             return result;
2612         }
2613
2614         @Override
2615         public long getRandomAccessId(Resource resource) throws DatabaseException {
2616
2617             ResourceImpl resourceImpl = (ResourceImpl) resource;
2618             return createRandomAccessId(resourceImpl.id);
2619
2620         }
2621
2622         @Override
2623         public int getTransientId(Resource resource) throws DatabaseException {
2624
2625             ResourceImpl resourceImpl = (ResourceImpl) resource;
2626             return resourceImpl.id;
2627
2628         }
2629
2630         @Override
2631         public String createRandomAccessId(Resource resource)
2632         throws InvalidResourceReferenceException {
2633
2634             if(resource == null) throw new IllegalArgumentException();
2635
2636             ResourceImpl resourceImpl = (ResourceImpl) resource;
2637
2638             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2639
2640             int r;
2641             try {
2642                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2643             } catch (DatabaseException e1) {
2644                 throw new InvalidResourceReferenceException(e1);
2645             }
2646             try {
2647                 // Serialize as '<resource index>_<cluster id>'
2648                 return "" + r + "_" + getCluster(resourceImpl);
2649             } catch (Throwable e) {
2650                 e.printStackTrace();
2651                 throw new InvalidResourceReferenceException(e);
2652             } finally {
2653             }
2654         }
2655
2656         public int getTransientId(long serialized) throws DatabaseException {
2657             if (serialized <= 0)
2658                 return (int)serialized;
2659             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2660             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2661             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2662             if (cluster == null)
2663                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2664             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2665             return key;
2666         }
2667
2668         @Override
2669         public Resource getResource(long randomAccessId) throws DatabaseException {
2670             return getResourceByKey(getTransientId(randomAccessId));
2671         }
2672
2673         @Override
2674         public Resource getResource(int transientId) throws DatabaseException {
2675             return getResourceByKey(transientId);
2676         }
2677
2678         @Override
2679         public Resource getResource(String randomAccessId)
2680         throws InvalidResourceReferenceException {
2681             try {
2682                 int i = randomAccessId.indexOf('_');
2683                 if (i == -1)
2684                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2685                             + randomAccessId + "'");
2686                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2687                 if(r < 0) return getResourceByKey(r);
2688                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2689                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2690                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2691                 if (cluster.hasResource(key, clusterTranslator))
2692                     return getResourceByKey(key);
2693             } catch (InvalidResourceReferenceException e) {
2694                 throw e;
2695             } catch (NumberFormatException e) {
2696                 throw new InvalidResourceReferenceException(e);
2697             } catch (Throwable e) {
2698                 e.printStackTrace();
2699                 throw new InvalidResourceReferenceException(e);
2700             } finally {
2701             }
2702             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2703         }
2704
2705         @Override
2706         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2707             try {
2708                 return true;
2709             } catch (Throwable e) {
2710                 e.printStackTrace();
2711                 throw new InvalidResourceReferenceException(e);
2712             } finally {
2713             }
2714         }
2715     }
2716
2717     // Copied from old SessionImpl
2718
2719     void check() {
2720         if (state.isClosed())
2721             throw new Error("Session closed.");
2722     }
2723
2724
2725
2726     public ResourceImpl getNewResource(long clusterId) {
2727         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2728         int newId;
2729         try {
2730             newId = cluster.createResource(clusterTranslator);
2731         } catch (DatabaseException e) {
2732             Logger.defaultLogError(e);
2733             return null;
2734         }
2735         return new ResourceImpl(resourceSupport, newId);
2736     }
2737
2738     public ResourceImpl getNewResource(Resource clusterSet)
2739     throws DatabaseException {
2740         long resourceId = clusterSet.getResourceId();
2741         Long clusterId = clusterSetsSupport.get(resourceId);
2742         if (null == clusterId)
2743             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2744         if (Constants.NewClusterId == clusterId) {
2745             clusterId = getService(ClusteringSupport.class).createCluster();
2746             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2747                 createdClusters.add(clusterId);
2748             clusterSetsSupport.put(resourceId, clusterId);
2749             return getNewResource(clusterId);
2750         } else {
2751             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2752             ResourceImpl result;
2753             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2754                 clusterId = getService(ClusteringSupport.class).createCluster();
2755                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2756                     createdClusters.add(clusterId);
2757                 clusterSetsSupport.put(resourceId, clusterId);
2758                 return getNewResource(clusterId);
2759             } else {
2760                 result = getNewResource(clusterId);
2761                 int resultKey = querySupport.getId(result);
2762                 long resultCluster = querySupport.getClusterId(resultKey);
2763                 if (clusterId != resultCluster)
2764                     clusterSetsSupport.put(resourceId, resultCluster);
2765                 return result;
2766             }
2767         }
2768     }
2769
2770     public void getNewClusterSet(Resource clusterSet)
2771     throws DatabaseException {
2772         if (DEBUG)
2773             System.out.println("new cluster set=" + clusterSet);
2774         long resourceId = clusterSet.getResourceId();
2775         if (clusterSetsSupport.containsKey(resourceId))
2776             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2777         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2778     }
2779     public boolean containsClusterSet(Resource clusterSet)
2780     throws ServiceException {
2781         long resourceId = clusterSet.getResourceId();
2782         return clusterSetsSupport.containsKey(resourceId);
2783     }
2784     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2785         Resource r = defaultClusterSet;
2786         defaultClusterSet = clusterSet;
2787         return r;
2788     }
2789     void printDiagnostics() {
2790
2791         if (DIAGNOSTICS) {
2792
2793             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2794             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2795             for(ClusterI cluster : clusterTable.getClusters()) {
2796                 try {
2797                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2798                         residentClusters.set(residentClusters.get() + 1);
2799                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2800                     }
2801                 } catch (DatabaseException e) {
2802                     Logger.defaultLogError(e);
2803                 }
2804             }
2805
2806             System.out.println("--------------------------------");
2807             System.out.println("Cluster information:");
2808             System.out.println("-amount of resident clusters=" + residentClusters.get());
2809             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2810
2811             for(ClusterI cluster : clusterTable.getClusters()) {
2812                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2813                 try {
2814                     if (!cluster.isLoaded())
2815                         System.out.println("not loaded]");
2816                     else if (cluster.isEmpty())
2817                         System.out.println("is empty]");
2818                     else {
2819                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2820                         System.out.print(",references=" + cluster.getReferenceCount());
2821                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2822                     }
2823                 } catch (DatabaseException e) {
2824                     Logger.defaultLogError(e);
2825                     System.out.println("is corrupted]");
2826                 }
2827             }
2828
2829             queryProvider2.printDiagnostics();
2830             System.out.println("--------------------------------");
2831         }
2832
2833     }
2834
2835     public void fireStartReadTransaction() {
2836
2837         if (DIAGNOSTICS)
2838             System.out.println("StartReadTransaction");
2839
2840         for (SessionEventListener listener : eventListeners) {
2841             try {
2842                 listener.readTransactionStarted();
2843             } catch (Throwable t) {
2844                 t.printStackTrace();
2845             }
2846         }
2847
2848     }
2849
2850     public void fireFinishReadTransaction() {
2851
2852         if (DIAGNOSTICS)
2853             System.out.println("FinishReadTransaction");
2854
2855         for (SessionEventListener listener : eventListeners) {
2856             try {
2857                 listener.readTransactionFinished();
2858             } catch (Throwable t) {
2859                 t.printStackTrace();
2860             }
2861         }
2862
2863     }
2864
2865     public void fireStartWriteTransaction() {
2866
2867         if (DIAGNOSTICS)
2868             System.out.println("StartWriteTransaction");
2869
2870         for (SessionEventListener listener : eventListeners) {
2871             try {
2872                 listener.writeTransactionStarted();
2873             } catch (Throwable t) {
2874                 t.printStackTrace();
2875             }
2876         }
2877
2878     }
2879
2880     public void fireFinishWriteTransaction() {
2881
2882         if (DIAGNOSTICS)
2883             System.out.println("FinishWriteTransaction");
2884
2885         for (SessionEventListener listener : eventListeners) {
2886             try {
2887                 listener.writeTransactionFinished();
2888             } catch (Throwable t) {
2889                 t.printStackTrace();
2890             }
2891         }
2892
2893         Indexing.resetDependenciesIndexingDisabled();
2894
2895     }
2896
2897     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2898         throw new Error("Not supported at the moment.");
2899     }
2900
2901     State getState() {
2902         return state;
2903     }
2904
2905     ClusterTable getClusterTable() {
2906         return clusterTable;
2907     }
2908
2909     public GraphSession getGraphSession() {
2910         return graphSession;
2911     }
2912
2913     public QueryProcessor getQueryProvider2() {
2914         return queryProvider2;
2915     }
2916
2917     ClientChangesImpl getClientChanges() {
2918         return clientChanges;
2919     }
2920
2921     boolean getWriteOnly() {
2922         return writeOnly;
2923     }
2924
2925     static int counter = 0;
2926
2927     public void onClusterLoaded(long clusterId) {
2928
2929         clusterTable.updateSize();
2930
2931     }
2932
2933
2934
2935
2936     /*
2937      * Implementation of the interface RequestProcessor
2938      */
2939
2940     @Override
2941     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2942
2943         assertNotSession();
2944         assertAlive();
2945
2946         assert(request != null);
2947
2948         final DataContainer<T> result = new DataContainer<T>();
2949         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2950
2951         syncRequest(request, new AsyncProcedure<T>() {
2952
2953             @Override
2954             public void execute(AsyncReadGraph graph, T t) {
2955                 result.set(t);
2956             }
2957
2958             @Override
2959             public void exception(AsyncReadGraph graph, Throwable t) {
2960                 exception.set(t);
2961             }
2962
2963         });
2964
2965         Throwable t = exception.get();
2966         if(t != null) {
2967             if(t instanceof DatabaseException) throw (DatabaseException)t;
2968             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2969         }
2970
2971         return result.get();
2972
2973     }
2974
2975 //    @Override
2976 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2977 //        assertNotSession();
2978 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2979 //    }
2980
2981     @Override
2982     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2983         assertNotSession();
2984         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2985     }
2986
2987     @Override
2988     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2989         assertNotSession();
2990         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2991     }
2992
2993     @Override
2994     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2995         assertNotSession();
2996         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2997     }
2998
2999     @Override
3000     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
3001         assertNotSession();
3002         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3003     }
3004
3005     @Override
3006     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3007
3008         assertNotSession();
3009
3010         assert(request != null);
3011
3012         final DataContainer<T> result = new DataContainer<T>();
3013         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3014
3015         syncRequest(request, new AsyncProcedure<T>() {
3016
3017             @Override
3018             public void execute(AsyncReadGraph graph, T t) {
3019                 result.set(t);
3020             }
3021
3022             @Override
3023             public void exception(AsyncReadGraph graph, Throwable t) {
3024                 exception.set(t);
3025             }
3026
3027         });
3028
3029         Throwable t = exception.get();
3030         if(t != null) {
3031             if(t instanceof DatabaseException) throw (DatabaseException)t;
3032             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3033         }
3034
3035         return result.get();
3036
3037     }
3038
3039     @Override
3040     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3041         assertNotSession();
3042         return syncRequest(request, (AsyncProcedure<T>)procedure);
3043     }
3044
3045     @Override
3046     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3047         assertNotSession();
3048         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3049     }
3050
3051     @Override
3052     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3053         assertNotSession();
3054         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3055     }
3056
3057     @Override
3058     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3059         assertNotSession();
3060         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3061     }
3062
3063     @Override
3064     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3065         assertNotSession();
3066         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3067     }
3068
3069     @Override
3070     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3071
3072         assertNotSession();
3073
3074         assert(request != null);
3075
3076         final ArrayList<T> result = new ArrayList<T>();
3077         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3078
3079         syncRequest(request, new SyncMultiProcedure<T>() {
3080
3081             @Override
3082             public void execute(ReadGraph graph, T t) {
3083                 synchronized(result) {
3084                     result.add(t);
3085                 }
3086             }
3087
3088             @Override
3089             public void finished(ReadGraph graph) {
3090             }
3091
3092             @Override
3093             public void exception(ReadGraph graph, Throwable t) {
3094                 exception.set(t);
3095             }
3096
3097
3098         });
3099
3100         Throwable t = exception.get();
3101         if(t != null) {
3102             if(t instanceof DatabaseException) throw (DatabaseException)t;
3103             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3104         }
3105
3106         return result;
3107
3108     }
3109
3110     @Override
3111     public <T> Collection<T> syncRequest(MultiRead<T> arg0, SyncMultiProcedure<T> arg1) throws DatabaseException {
3112         assertNotSession();
3113         throw new Error("Not implemented!");
3114     }
3115
3116     @Override
3117     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3118         assertNotSession();
3119         return syncRequest(request, (SyncMultiProcedure<T>)procedure);
3120     }
3121
3122     @Override
3123     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3124         assertNotSession();
3125         return syncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3126     }
3127
3128     @Override
3129     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3130         assertNotSession();
3131         return syncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3132     }
3133
3134     @Override
3135     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3136
3137         assertNotSession();
3138
3139         assert(request != null);
3140
3141         final ArrayList<T> result = new ArrayList<T>();
3142         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3143
3144         syncRequest(request, new AsyncMultiProcedure<T>() {
3145
3146             @Override
3147             public void execute(AsyncReadGraph graph, T t) {
3148                 synchronized(result) {
3149                     result.add(t);
3150                 }
3151             }
3152
3153             @Override
3154             public void finished(AsyncReadGraph graph) {
3155             }
3156
3157             @Override
3158             public void exception(AsyncReadGraph graph, Throwable t) {
3159                 exception.set(t);
3160             }
3161
3162
3163         });
3164
3165         Throwable t = exception.get();
3166         if (t != null) {
3167             if (t instanceof DatabaseException)
3168                 throw (DatabaseException) t;
3169             else
3170                 throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3171         }
3172
3173         return result;
3174
3175     }
3176
3177     @Override
3178     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3179         assertNotSession();
3180         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3181     }
3182
3183     @Override
3184     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3185         assertNotSession();
3186         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3187     }
3188
3189     @Override
3190     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3191         assertNotSession();
3192        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3193     }
3194
3195     @Override
3196     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3197         assertNotSession();
3198         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3199     }
3200
3201     @Override
3202     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3203         assertNotSession();
3204         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3205     }
3206
3207
3208     @Override
3209     public <T> void asyncRequest(Read<T> request) {
3210
3211         asyncRequest(request, new ProcedureAdapter<T>() {
3212             @Override
3213             public void exception(Throwable t) {
3214                 t.printStackTrace();
3215             }
3216         });
3217
3218     }
3219
3220     @Override
3221     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3222         asyncRequest(request, (AsyncProcedure<T>)procedure);
3223     }
3224
3225     @Override
3226     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3227         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3228     }
3229
3230     @Override
3231     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3232         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3233     }
3234
3235     @Override
3236     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3237         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3238     }
3239
3240     @Override
3241     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3242         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3243     }
3244
3245     @Override
3246     final public <T> void asyncRequest(final AsyncRead<T> request) {
3247
3248         assert(request != null);
3249
3250         asyncRequest(request, new ProcedureAdapter<T>() {
3251             @Override
3252             public void exception(Throwable t) {
3253                 t.printStackTrace();
3254             }
3255         });
3256
3257     }
3258
3259     @Override
3260     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3261         scheduleRequest(request, procedure, procedure, null);
3262     }
3263
3264     @Override
3265     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3266         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3267     }
3268
3269     @Override
3270     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3271         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3272     }
3273
3274     @Override
3275     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3276         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3277     }
3278
3279     @Override
3280     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3281         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3282     }
3283
3284     @Override
3285     public <T> void asyncRequest(MultiRead<T> request) {
3286
3287         assert(request != null);
3288
3289         asyncRequest(request, new SyncMultiProcedureAdapter<T>() {
3290             @Override
3291             public void exception(ReadGraph graph, Throwable t) {
3292                 t.printStackTrace();
3293             }
3294         });
3295
3296     }
3297
3298     @Override
3299     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3300         asyncRequest(request, (SyncMultiProcedure<T>)procedure);
3301     }
3302
3303     @Override
3304     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3305         asyncRequest(request, new NoneToSyncMultiListener<T>(procedure));
3306     }
3307
3308     @Override
3309     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3310         scheduleRequest(request, procedure, null);
3311     }
3312
3313     @Override
3314     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3315         asyncRequest(request, new NoneToSyncMultiProcedure<T>(procedure));
3316     }
3317
3318     @Override
3319     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3320
3321         assert(request != null);
3322
3323         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3324             @Override
3325             public void exception(AsyncReadGraph graph, Throwable t) {
3326                 t.printStackTrace();
3327             }
3328         });
3329
3330     }
3331
3332     @Override
3333     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3334         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3335     }
3336
3337     @Override
3338     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3339         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3340     }
3341
3342     @Override
3343     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3344         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3345     }
3346
3347     @Override
3348     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3349         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3350     }
3351
3352     @Override
3353     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3354         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3355     }
3356
3357     @Override
3358     final public <T> void asyncRequest(final ExternalRead<T> request) {
3359
3360         assert(request != null);
3361
3362         asyncRequest(request, new ProcedureAdapter<T>() {
3363             @Override
3364             public void exception(Throwable t) {
3365                 t.printStackTrace();
3366             }
3367         });
3368
3369     }
3370
3371     @Override
3372     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3373         asyncRequest(request, (Procedure<T>)procedure);
3374     }
3375
3376     boolean sameProvider(Write request) {
3377         if(writeState.getGraph().provider != null) {
3378             return writeState.getGraph().provider.equals(request.getProvider());
3379         } else {
3380             return request.getProvider() == null;
3381         }
3382     }
3383
3384     boolean plainWrite(WriteGraphImpl graph) {
3385         if(graph == null) return false;
3386         if(graph.writeSupport.writeOnly()) return false;
3387         return true;
3388     }
3389
3390
3391     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3392
3393     private void assertNotSession() throws DatabaseException {
3394         Thread current = Thread.currentThread();
3395         if (sessionThreads.contains(current))
3396             throw new ServiceException("Caller is already inside a transaction.");
3397     }
3398
3399     void assertAlive() {
3400         if (!state.isAlive())
3401             throw new RuntimeDatabaseException("Session has been shut down.");
3402     }
3403
3404     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3405         return querySupport.getValueStream(graph, querySupport.getId(resource));
3406     }
3407
3408     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3409         return querySupport.getValue(graph, querySupport.getId(resource));
3410     }
3411
3412     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3413         acquire(semaphore, request, null);
3414     }
3415
3416     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3417         assertAlive();
3418         try {
3419             long tryCount = 0;
3420             // Loop until the semaphore is acquired reporting requests that take a long time.
3421             while (true) {
3422
3423                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3424                     // Acquired OK.
3425                     return;
3426                 } else {
3427                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3428                         ++tryCount;
3429                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3430                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3431                                 * tryCount;
3432                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3433                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3434                     }
3435                 }
3436
3437                 assertAlive();
3438
3439             }
3440         } catch (InterruptedException e) {
3441             e.printStackTrace();
3442             // FIXME: Should perhaps do something else in this case ??
3443         }
3444     }
3445
3446
3447
3448 //    @Override
3449 //    public boolean holdOnToTransactionAfterCancel() {
3450 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3451 //    }
3452 //
3453 //    @Override
3454 //    public boolean holdOnToTransactionAfterCommit() {
3455 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3456 //    }
3457 //
3458 //    @Override
3459 //    public boolean holdOnToTransactionAfterRead() {
3460 //        return transactionPolicy.holdOnToTransactionAfterRead();
3461 //    }
3462 //
3463 //    @Override
3464 //    public void onRelinquish() {
3465 //        transactionPolicy.onRelinquish();
3466 //    }
3467 //
3468 //    @Override
3469 //    public void onRelinquishDone() {
3470 //        transactionPolicy.onRelinquishDone();
3471 //    }
3472 //
3473 //    @Override
3474 //    public void onRelinquishError() {
3475 //        transactionPolicy.onRelinquishError();
3476 //    }
3477
3478
3479     @Override
3480     public Session getSession() {
3481         return this;
3482     }
3483
3484
3485     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3486     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3487
3488     public void ceased(int thread) {
3489
3490         requestManager.ceased(thread);
3491
3492     }
3493
3494     public int getAmountOfQueryThreads() {
3495         // This must be a power of two
3496         return 1;
3497 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3498     }
3499
3500     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3501
3502         return new ResourceImpl(resourceSupport, key);
3503
3504     }
3505
3506     public void acquireWriteOnly() {
3507         writeOnly = true;
3508     }
3509
3510     public void releaseWriteOnly(ReadGraphImpl graph) {
3511         writeOnly = false;
3512         queryProvider2.releaseWrite(graph);
3513     }
3514
3515     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3516
3517         long start = System.nanoTime();
3518
3519         while(dirtyPrimitives) {
3520             dirtyPrimitives = false;
3521             getQueryProvider2().propagateChangesInQueryCache(writer);
3522             getQueryProvider2().listening.fireListeners(writer);
3523         }
3524
3525         fireMetadataListeners(writer, clientChanges);
3526
3527         if(DebugPolicy.PERFORMANCE_DATA) {
3528             long end = System.nanoTime();
3529             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3530         }
3531
3532     }
3533
3534     void removeTemporaryData() {
3535         File platform = Platform.getLocation().toFile();
3536         File tempFiles = new File(platform, "tempFiles");
3537         File temp = new File(tempFiles, "db");
3538         if(!temp.exists()) 
3539             return;
3540         try {
3541             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3542                 @Override
3543                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3544                     try {
3545                         Files.delete(file);
3546                     } catch (IOException e) {
3547                         Logger.defaultLogError(e);
3548                     }
3549                     return FileVisitResult.CONTINUE;
3550                 }
3551             });
3552         } catch (IOException e) {
3553             Logger.defaultLogError(e);
3554         }
3555     }
3556
3557     public void handleCreatedClusters() {
3558         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3559             createdClusters.forEach(new TLongProcedure() {
3560
3561                 @Override
3562                 public boolean execute(long value) {
3563                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3564                     cluster.setImmutable(true, clusterTranslator);
3565                     return true;
3566                 }
3567             });
3568         }
3569         createdClusters.clear();
3570     }
3571
3572     @Override
3573     public Object getModificationCounter() {
3574         return queryProvider2.modificationCounter;
3575     }
3576     
3577     @Override
3578     public void markUndoPoint() {
3579         state.setCombine(false);
3580     }
3581
3582 }