]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Still working for multiple readers
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.BlockingAsyncProcedure;
54 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
55 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
59 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
63 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
64 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
65 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
66 import org.simantics.db.common.utils.Logger;
67 import org.simantics.db.event.ChangeEvent;
68 import org.simantics.db.event.ChangeListener;
69 import org.simantics.db.event.SessionEventListener;
70 import org.simantics.db.exception.CancelTransactionException;
71 import org.simantics.db.exception.ClusterSetExistException;
72 import org.simantics.db.exception.DatabaseException;
73 import org.simantics.db.exception.ImmutableException;
74 import org.simantics.db.exception.InvalidResourceReferenceException;
75 import org.simantics.db.exception.ResourceNotFoundException;
76 import org.simantics.db.exception.RuntimeDatabaseException;
77 import org.simantics.db.exception.ServiceException;
78 import org.simantics.db.exception.ServiceNotFoundException;
79 import org.simantics.db.impl.ClusterBase;
80 import org.simantics.db.impl.ClusterI;
81 import org.simantics.db.impl.ClusterTraitsBase;
82 import org.simantics.db.impl.ClusterTranslator;
83 import org.simantics.db.impl.ResourceImpl;
84 import org.simantics.db.impl.TransientGraph;
85 import org.simantics.db.impl.VirtualGraphImpl;
86 import org.simantics.db.impl.graph.DelayedWriteGraph;
87 import org.simantics.db.impl.graph.ReadGraphImpl;
88 import org.simantics.db.impl.graph.WriteGraphImpl;
89 import org.simantics.db.impl.graph.WriteSupport;
90 import org.simantics.db.impl.internal.RandomAccessValueSupport;
91 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
92 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
93 import org.simantics.db.impl.query.QueryCache;
94 import org.simantics.db.impl.query.QueryCacheBase;
95 import org.simantics.db.impl.query.QueryProcessor;
96 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
97 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
98 import org.simantics.db.impl.service.QueryDebug;
99 import org.simantics.db.impl.support.VirtualGraphServerSupport;
100 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
101 import org.simantics.db.procedure.AsyncListener;
102 import org.simantics.db.procedure.AsyncMultiListener;
103 import org.simantics.db.procedure.AsyncMultiProcedure;
104 import org.simantics.db.procedure.AsyncProcedure;
105 import org.simantics.db.procedure.Listener;
106 import org.simantics.db.procedure.ListenerBase;
107 import org.simantics.db.procedure.MultiListener;
108 import org.simantics.db.procedure.MultiProcedure;
109 import org.simantics.db.procedure.Procedure;
110 import org.simantics.db.procedure.SyncListener;
111 import org.simantics.db.procedure.SyncMultiListener;
112 import org.simantics.db.procedure.SyncMultiProcedure;
113 import org.simantics.db.procedure.SyncProcedure;
114 import org.simantics.db.procore.cluster.ClusterImpl;
115 import org.simantics.db.procore.cluster.ClusterTraits;
116 import org.simantics.db.procore.protocol.Constants;
117 import org.simantics.db.procore.protocol.DebugPolicy;
118 import org.simantics.db.request.AsyncMultiRead;
119 import org.simantics.db.request.AsyncRead;
120 import org.simantics.db.request.DelayedWrite;
121 import org.simantics.db.request.DelayedWriteResult;
122 import org.simantics.db.request.ExternalRead;
123 import org.simantics.db.request.MultiRead;
124 import org.simantics.db.request.Read;
125 import org.simantics.db.request.ReadInterface;
126 import org.simantics.db.request.Write;
127 import org.simantics.db.request.WriteInterface;
128 import org.simantics.db.request.WriteOnly;
129 import org.simantics.db.request.WriteOnlyResult;
130 import org.simantics.db.request.WriteResult;
131 import org.simantics.db.request.WriteTraits;
132 import org.simantics.db.service.ByteReader;
133 import org.simantics.db.service.ClusterBuilder;
134 import org.simantics.db.service.ClusterBuilderFactory;
135 import org.simantics.db.service.ClusterControl;
136 import org.simantics.db.service.ClusterSetsSupport;
137 import org.simantics.db.service.ClusterUID;
138 import org.simantics.db.service.ClusteringSupport;
139 import org.simantics.db.service.CollectionSupport;
140 import org.simantics.db.service.DebugSupport;
141 import org.simantics.db.service.DirectQuerySupport;
142 import org.simantics.db.service.GraphChangeListenerSupport;
143 import org.simantics.db.service.InitSupport;
144 import org.simantics.db.service.LifecycleSupport;
145 import org.simantics.db.service.ManagementSupport;
146 import org.simantics.db.service.QueryControl;
147 import org.simantics.db.service.SerialisationSupport;
148 import org.simantics.db.service.ServerInformation;
149 import org.simantics.db.service.ServiceActivityMonitor;
150 import org.simantics.db.service.SessionEventSupport;
151 import org.simantics.db.service.SessionMonitorSupport;
152 import org.simantics.db.service.SessionUserSupport;
153 import org.simantics.db.service.StatementSupport;
154 import org.simantics.db.service.TransactionPolicySupport;
155 import org.simantics.db.service.TransactionSupport;
156 import org.simantics.db.service.TransferableGraphSupport;
157 import org.simantics.db.service.UndoRedoSupport;
158 import org.simantics.db.service.VirtualGraphSupport;
159 import org.simantics.db.service.XSupport;
160 import org.simantics.layer0.Layer0;
161 import org.simantics.utils.DataContainer;
162 import org.simantics.utils.Development;
163 import org.simantics.utils.threads.logger.ITask;
164 import org.simantics.utils.threads.logger.ThreadLogger;
165
166 import gnu.trove.procedure.TLongProcedure;
167 import gnu.trove.set.hash.TLongHashSet;
168
169
170 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
171
172     protected static final boolean DEBUG        = false;
173
174     private static final boolean DIAGNOSTICS       = false;
175
176     private TransactionPolicySupport transactionPolicy;
177
178     final private ClusterControl clusterControl;
179
180     final protected BuiltinSupportImpl builtinSupport;
181     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
182     final protected ClusterSetsSupport clusterSetsSupport;
183     final protected LifecycleSupportImpl lifecycleSupport;
184
185     protected QuerySupportImpl querySupport;
186     protected ResourceSupportImpl resourceSupport;
187     protected WriteSupport writeSupport;
188     public ClusterTranslator clusterTranslator;
189
190     boolean dirtyPrimitives = false;
191
192     public static final int SERVICE_MODE_CREATE = 2;
193     public static final int SERVICE_MODE_ALLOW = 1;
194     public int serviceMode = 0;
195     public boolean createdImmutableClusters = false;
196     public TLongHashSet createdClusters = new TLongHashSet();
197
198     private Layer0 L0;
199
200     /**
201      * The service locator maintained by the workbench. These services are
202      * initialized during workbench during the <code>init</code> method.
203      */
204     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
205
206     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
207
208     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
209
210     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
211
212     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
213
214     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
215
216     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
217
218     final protected State                                        state              = new State();
219
220     protected GraphSession                                       graphSession       = null;
221
222     protected SessionManager                                     sessionManagerImpl = null;
223
224     protected UserAuthenticationAgent                            authAgent          = null;
225
226     protected UserAuthenticator                                  authenticator      = null;
227
228     protected Resource                                           user               = null;
229
230     protected ClusterStream                                      clusterStream      = null;
231
232     protected SessionRequestManager                         requestManager = null;
233
234     public ClusterTable                                       clusterTable       = null;
235
236     public QueryProcessor                                     queryProvider2 = null;
237
238     ClientChangesImpl                                  clientChanges      = null;
239
240     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
241
242 //    protected long                                               newClusterId       = Constants.NullClusterId;
243
244     protected int                                                flushCounter       = 0;
245
246     protected boolean                                            writeOnly          = false;
247
248     WriteState<?>                                                writeState = null;
249     WriteStateBase<?>                                            delayedWriteState = null;
250     protected Resource defaultClusterSet = null; // If not null then used for newResource().
251
252     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
253
254 //        if (authAgent == null)
255 //            throw new IllegalArgumentException("null authentication agent");
256
257         File t = StaticSessionProperties.virtualGraphStoragePath;
258         if (null == t)
259             t = new File(".");
260         this.clusterTable = new ClusterTable(this, t);
261         this.builtinSupport = new BuiltinSupportImpl(this);
262         this.sessionManagerImpl = sessionManagerImpl;
263         this.user = null;
264 //        this.authAgent = authAgent;
265
266         serviceLocator.registerService(Session.class, this);
267         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
268         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
269         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
270         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
271         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
272         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
273         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
274         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
275         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
276         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
277         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
278         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
279         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
280         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
281         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
282         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
283         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
284         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
285         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
286         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
287         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
288         ServiceActivityUpdaterForWriteTransactions.register(this);
289
290         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
291         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
292         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
293         this.lifecycleSupport = new LifecycleSupportImpl(this);
294         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
295         this.transactionPolicy = new TransactionPolicyRelease();
296         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
297         this.clusterControl = new ClusterControlImpl(this);
298         serviceLocator.registerService(ClusterControl.class, clusterControl);
299         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
300         this.clusterSetsSupport.setReadDirectory(t.toPath());
301         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
302         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
303     }
304
305     /*
306      *
307      * Session interface
308      */
309
310
311     @Override
312     public Resource getRootLibrary() {
313         return queryProvider2.getRootLibraryResource();
314     }
315
316     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
317         if (!graphSession.dbSession.refreshEnabled())
318             return;
319         try {
320             getClusterTable().refresh(csid, this, clusterUID);
321         } catch (Throwable t) {
322             Logger.defaultLogError("Refesh failed.", t);
323         }
324     }
325
326     static final class TaskHelper {
327         private final String name;
328         private Object result;
329         final Semaphore sema = new Semaphore(0);
330         private Throwable throwable = null;
331         final Consumer<DatabaseException> callback = e -> {
332             synchronized (TaskHelper.this) {
333                 throwable = e;
334             }
335         };
336         final Procedure<Object> proc = new Procedure<Object>() {
337             @Override
338             public void execute(Object result) {
339                 callback.accept(null);
340             }
341             @Override
342             public void exception(Throwable t) {
343                 if (t instanceof DatabaseException)
344                     callback.accept((DatabaseException)t);
345                 else
346                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
347             }
348         };
349         final WriteTraits writeTraits = new WriteTraits() {};
350         TaskHelper(String name) {
351             this.name = name;
352         }
353         @SuppressWarnings("unchecked")
354         <T> T getResult() {
355             return (T)result;
356         }
357         void setResult(Object result) {
358             this.result = result;
359         }
360 //        Throwable throwableGet() {
361 //            return throwable;
362 //        }
363         synchronized void throwableSet(Throwable t) {
364             throwable = t;
365         }
366 //        void throwableSet(String t) {
367 //            throwable = new InternalException("" + name + " operation failed. " + t);
368 //        }
369         synchronized void throwableCheck()
370         throws DatabaseException {
371             if (null != throwable)
372                 if (throwable instanceof DatabaseException)
373                     throw (DatabaseException)throwable;
374                 else
375                     throw new DatabaseException("Undo operation failed.", throwable);
376         }
377         void throw_(String message)
378         throws DatabaseException {
379             throw new DatabaseException("" + name + " operation failed. " + message);
380         }
381     }
382
383
384
385     private ListenerBase getListenerBase(Object procedure) {
386         if (procedure instanceof ListenerBase)
387             return (ListenerBase) procedure;
388         else
389             return null;
390     }
391
392     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
393         scheduleRequest(request, callback, notify, null);
394     }
395
396     /* (non-Javadoc)
397      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
398      */
399     @Override
400     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
401
402         assert (request != null);
403
404         if(Development.DEVELOPMENT) {
405             try {
406             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
407                 System.err.println("schedule write '" + request + "'");
408             } catch (Throwable t) {
409                 Logger.defaultLogError(t);
410             }
411         }
412
413         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
414
415         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
416
417             @Override
418             public void run(int thread) {
419
420                 if(Development.DEVELOPMENT) {
421                     try {
422                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
423                         System.err.println("perform write '" + request + "'");
424                     } catch (Throwable t) {
425                         Logger.defaultLogError(t);
426                     }
427                 }
428
429                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
430
431                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
432
433                 try {
434
435                     flushCounter = 0;
436                     Disposable.safeDispose(clientChanges);
437                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
438
439                     VirtualGraph vg = getProvider(request.getProvider());
440
441                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
442                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
443
444                         @Override
445                         public void execute(Object result) {
446                             if(callback != null) callback.accept(null);
447                         }
448
449                         @Override
450                         public void exception(Throwable t) {
451                             if(callback != null) callback.accept((DatabaseException)t);
452                         }
453
454                     });
455
456                     assert (null != writer);
457 //                    writer.state.barrier.inc();
458
459                     try {
460                         request.perform(writer);
461                         assert (null != writer);
462                     } catch (Throwable t) {
463                         if (!(t instanceof CancelTransactionException))
464                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
465                         writeState.except(t);
466                     } finally {
467 //                        writer.state.barrier.dec();
468 //                        writer.waitAsync(request);
469                     }
470
471
472                     assert(!queryProvider2.cache.dirty);
473
474                 } catch (Throwable e) {
475
476                     // Log it first, just to be safe that the error is always logged.
477                     if (!(e instanceof CancelTransactionException))
478                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
479
480 //                    writeState.getGraph().state.barrier.dec();
481 //                    writeState.getGraph().waitAsync(request);
482
483                     writeState.except(e);
484
485 //                    try {
486 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
487 //                        // All we can do here is to log those, can't really pass them anywhere.
488 //                        if (callback != null) {
489 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
490 //                            else callback.run(new DatabaseException(e));
491 //                        }
492 //                    } catch (Throwable e2) {
493 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
494 //                    }
495 //                    boolean empty = clusterStream.reallyFlush();
496 //                    int callerThread = -1;
497 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
498 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
499 //                    try {
500 //                        // Send all local changes to server so it can calculate correct reverse change set.
501 //                        // This will call clusterStream.accept().
502 //                        state.cancelCommit(context, clusterStream);
503 //                        if (!empty) {
504 //                            if (!context.isOk()) // this is a blocking operation
505 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
506 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
507 //                        }
508 //                        state.cancelCommit2(context, clusterStream);
509 //                    } catch (DatabaseException e3) {
510 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
511 //                    } finally {
512 //                        clusterStream.setOff(false);
513 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
514 //                    }
515
516                 } finally {
517
518                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
519
520                 }
521
522                 task.finish();
523
524                 if(Development.DEVELOPMENT) {
525                     try {
526                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
527                         System.err.println("finish write '" + request + "'");
528                     } catch (Throwable t) {
529                         Logger.defaultLogError(t);
530                     }
531                 }
532
533             }
534
535         }, combine);
536
537     }
538
539     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
540         scheduleRequest(request, procedure, notify, null);
541     }
542
543     /* (non-Javadoc)
544      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
545      */
546     @Override
547     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
548
549         assert (request != null);
550
551         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
552
553         requestManager.scheduleWrite(new SessionTask(request, thread) {
554
555             @Override
556             public void run(int thread) {
557
558                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
559
560                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
561
562                 flushCounter = 0;
563                 Disposable.safeDispose(clientChanges);
564                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
565
566                 VirtualGraph vg = getProvider(request.getProvider());
567                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
568
569                 try {
570                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
571                     writeState = writeStateT;
572
573                     assert (null != writer);
574 //                    writer.state.barrier.inc();
575                     writeStateT.setResult(request.perform(writer));
576                     assert (null != writer);
577
578 //                    writer.state.barrier.dec();
579 //                    writer.waitAsync(null);
580
581                 } catch (Throwable e) {
582
583 //                    writer.state.barrier.dec();
584 //                    writer.waitAsync(null);
585
586                     writeState.except(e);
587
588 //                  state.stopWriteTransaction(clusterStream);
589 //
590 //              } catch (Throwable e) {
591 //                  // Log it first, just to be safe that the error is always logged.
592 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
593 //
594 //                  try {
595 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
596 //                      // All we can do here is to log those, can't really pass them anywhere.
597 //                      if (procedure != null) {
598 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
599 //                          else procedure.exception(new DatabaseException(e));
600 //                      }
601 //                  } catch (Throwable e2) {
602 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
603 //                  }
604 //
605 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
606 //
607 //                  state.stopWriteTransaction(clusterStream);
608
609                 } finally {
610                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
611                 }
612
613 //              if(notify != null) notify.release();
614
615                 task.finish();
616
617             }
618
619         }, combine);
620
621     }
622
623     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
624         scheduleRequest(request, callback, notify, null);
625     }
626
627     /* (non-Javadoc)
628      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
629      */
630     @Override
631     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
632
633         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
634
635         assert (request != null);
636
637         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
638
639         requestManager.scheduleWrite(new SessionTask(request, thread) {
640
641             @Override
642             public void run(int thread) {
643                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
644
645                 Procedure<Object> stateProcedure = new Procedure<Object>() {
646                     @Override
647                     public void execute(Object result) {
648                         if (callback != null)
649                             callback.accept(null);
650                     }
651                     @Override
652                     public void exception(Throwable t) {
653                         if (callback != null) {
654                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
655                             else callback.accept(new DatabaseException(t));
656                         } else
657                             Logger.defaultLogError("Unhandled exception", t);
658                     }
659                 };
660
661                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
662                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
663                 DelayedWriteGraph dwg = null;
664 //                newGraph.state.barrier.inc();
665
666                 try {
667                     dwg = new DelayedWriteGraph(newGraph);
668                     request.perform(dwg);
669                 } catch (Throwable e) {
670                     delayedWriteState.except(e);
671                     total.finish();
672                     dwg.close();
673                     return;
674                 } finally {
675 //                    newGraph.state.barrier.dec();
676 //                    newGraph.waitAsync(request);
677                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
678                 }
679
680                 delayedWriteState = null;
681
682                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
683                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
684
685                 flushCounter = 0;
686                 Disposable.safeDispose(clientChanges);
687                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
688
689                 acquireWriteOnly();
690
691                 VirtualGraph vg = getProvider(request.getProvider());
692                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
693
694                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
695
696                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
697
698                 assert (null != writer);
699 //                writer.state.barrier.inc();
700
701                 try {
702                     // Cannot cancel
703                     dwg.commit(writer, request);
704
705                         if(defaultClusterSet != null) {
706                                 XSupport xs = getService(XSupport.class);
707                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
708                                 ClusteringSupport cs = getService(ClusteringSupport.class);
709                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
710                         }
711
712                     // This makes clusters available from server
713                     clusterStream.reallyFlush();
714
715                     releaseWriteOnly(writer);
716                     handleUpdatesAndMetadata(writer);
717
718                 } catch (ServiceException e) {
719 //                    writer.state.barrier.dec();
720 //                    writer.waitAsync(null);
721
722                     // These shall be requested from server
723                     clusterTable.removeWriteOnlyClusters();
724                     // This makes clusters available from server
725                     clusterStream.reallyFlush();
726
727                     releaseWriteOnly(writer);
728                     writeState.except(e);
729                 } finally {
730                     // Debugging & Profiling
731                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
732                     task2.finish();
733                     total.finish();
734                 }
735             }
736
737         }, combine);
738
739     }
740
741     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
742         scheduleRequest(request, procedure, notify, null);
743     }
744
745     /* (non-Javadoc)
746      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
747      */
748     @Override
749     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
750         throw new Error("Not implemented");
751     }
752
753     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
754         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
755         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
756             createdClusters.add(cluster.clusterId);
757         }
758         return cluster;
759     }
760
761     class WriteOnlySupport implements WriteSupport {
762
763         ClusterStream stream;
764         ClusterImpl currentCluster;
765
766         public WriteOnlySupport() {
767             this.stream = clusterStream;
768         }
769
770         @Override
771         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
772             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
773         }
774
775         @Override
776         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
777
778             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
779
780             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
781                 if(s != queryProvider2.getRootLibrary())
782                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
783
784             try {
785                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
786             } catch (DatabaseException e) {
787                 Logger.defaultLogError(e);
788                 //throw e;
789             }
790
791             clientChanges.invalidate(s);
792
793             if (cluster.isWriteOnly())
794                 return;
795             queryProvider2.updateStatements(s, p);
796
797         }
798
799         @Override
800         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
801             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
802         }
803
804         @Override
805         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
806
807             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
808             try {
809                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
810             } catch (DatabaseException e) {
811                 Logger.defaultLogError(e);
812             }
813
814             clientChanges.invalidate(rid);
815
816             if (cluster.isWriteOnly())
817                 return;
818             queryProvider2.updateValue(rid);
819
820         }
821
822         @Override
823         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
824
825             if(amount < 65536) {
826                 claimValue(provider,resource, reader.readBytes(null, amount));
827                 return;
828             }
829
830             byte[] bytes = new byte[65536];
831
832             int rid = ((ResourceImpl)resource).id;
833             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
834             try {
835                 int left = amount;
836                 while(left > 0) {
837                     int block = Math.min(left, 65536);
838                     reader.readBytes(bytes, block);
839                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
840                     left -= block;
841                 }
842             } catch (DatabaseException e) {
843                 Logger.defaultLogError(e);
844             }
845
846             clientChanges.invalidate(rid);
847
848             if (cluster.isWriteOnly())
849                 return;
850             queryProvider2.updateValue(rid);
851
852         }
853
854         private void maintainCluster(ClusterImpl before, ClusterI after_) {
855             if(after_ != null && after_ != before) {
856                 ClusterImpl after = (ClusterImpl)after_;
857                 if(currentCluster == before) {
858                     currentCluster = after;
859                 }
860                 clusterTable.replaceCluster(after);
861             }
862         }
863
864         public int createResourceKey(int foreignCounter) throws DatabaseException {
865             if(currentCluster == null) {
866                 currentCluster = getNewResourceCluster();
867             }
868             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
869                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
870                 newCluster.foreignLookup = new byte[foreignCounter];
871                 currentCluster = newCluster;
872                 if (DEBUG)
873                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
874             }
875             return currentCluster.createResource(clusterTranslator);
876         }
877
878         @Override
879         public Resource createResource(VirtualGraph provider) throws DatabaseException {
880             if(currentCluster == null) {
881                 if (null != defaultClusterSet) {
882                         ResourceImpl result = getNewResource(defaultClusterSet);
883                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
884                     return result;
885                 } else {
886                     currentCluster = getNewResourceCluster();
887                 }
888             }
889             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
890                 if (null != defaultClusterSet) {
891                         ResourceImpl result = getNewResource(defaultClusterSet);
892                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
893                     return result;
894                 } else {
895                     currentCluster = getNewResourceCluster();
896                 }
897             }
898             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
899         }
900
901         @Override
902         public Resource createResource(VirtualGraph provider, long clusterId)
903         throws DatabaseException {
904             return getNewResource(clusterId);
905         }
906
907         @Override
908         public Resource createResource(VirtualGraph provider, Resource clusterSet)
909         throws DatabaseException {
910             return getNewResource(clusterSet);
911         }
912
913         @Override
914         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
915                 throws DatabaseException {
916             getNewClusterSet(clusterSet);
917         }
918
919         @Override
920         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
921         throws ServiceException {
922             return containsClusterSet(clusterSet);
923         }
924
925         public void selectCluster(long cluster) {
926                 currentCluster = clusterTable.getClusterByClusterId(cluster);
927                 long setResourceId = clusterSetsSupport.getSet(cluster);
928                 clusterSetsSupport.put(setResourceId, cluster);
929         }
930
931         @Override
932         public Resource setDefaultClusterSet(Resource clusterSet)
933         throws ServiceException {
934                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
935                 if(clusterSet != null) {
936                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
937                         currentCluster = clusterTable.getClusterByClusterId(id);
938                         return result;
939                 } else {
940                         currentCluster = null;
941                         return null;
942                 }
943         }
944
945         @Override
946         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
947
948                  provider = getProvider(provider);
949              if (null == provider) {
950                  int key = ((ResourceImpl)resource).id;
951
952
953
954                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
955 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
956 //                      if(key != queryProvider2.getRootLibrary())
957 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
958
959 //                 try {
960 //                     cluster.removeValue(key, clusterTranslator);
961 //                 } catch (DatabaseException e) {
962 //                     Logger.defaultLogError(e);
963 //                     return;
964 //                 }
965
966                  clusterTable.writeOnlyInvalidate(cluster);
967
968                  try {
969                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
970                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
971                          clusterTranslator.removeValue(cluster);
972                  } catch (DatabaseException e) {
973                          Logger.defaultLogError(e);
974                  }
975
976                  queryProvider2.invalidateResource(key);
977                  clientChanges.invalidate(key);
978
979              } else {
980                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
981                  queryProvider2.updateValue(querySupport.getId(resource));
982                  clientChanges.claimValue(resource);
983              }
984
985
986         }
987
988         @Override
989         public void flush(boolean intermediate) {
990             throw new UnsupportedOperationException();
991         }
992
993         @Override
994         public void flushCluster() {
995             clusterTable.flushCluster(graphSession);
996             if(defaultClusterSet != null) {
997                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
998             }
999             currentCluster = null;
1000         }
1001
1002         @Override
1003         public void flushCluster(Resource r) {
1004             throw new UnsupportedOperationException("flushCluster resource " + r);
1005         }
1006
1007         @Override
1008         public void gc() {
1009         }
1010
1011         @Override
1012         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1013                 Resource object) {
1014
1015                 int s = ((ResourceImpl)subject).id;
1016                 int p = ((ResourceImpl)predicate).id;
1017                 int o = ((ResourceImpl)object).id;
1018
1019                 provider = getProvider(provider);
1020                 if (null == provider) {
1021
1022                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1023                         clusterTable.writeOnlyInvalidate(cluster);
1024
1025                         try {
1026
1027                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1028                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1029                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1030
1031                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1032                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1033
1034                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1035                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1036                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1037                                 clusterTranslator.removeStatement(cluster);
1038
1039                                 queryProvider2.invalidateResource(s);
1040                                 clientChanges.invalidate(s);
1041
1042                         } catch (DatabaseException e) {
1043
1044                                 Logger.defaultLogError(e);
1045
1046                         }
1047
1048                         return true;
1049
1050                 } else {
1051                         
1052                         ((VirtualGraphImpl)provider).deny(s, p, o);
1053                         queryProvider2.invalidateResource(s);
1054                         clientChanges.invalidate(s);
1055
1056                         return true;
1057
1058                 }
1059
1060         }
1061
1062         @Override
1063         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1064             throw new UnsupportedOperationException();
1065         }
1066
1067         @Override
1068         public boolean writeOnly() {
1069             return true;
1070         }
1071
1072         @Override
1073         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1074             writeSupport.performWriteRequest(graph, request);
1075         }
1076
1077         @Override
1078         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1079             throw new UnsupportedOperationException();
1080         }
1081
1082         @Override
1083         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1084             throw new UnsupportedOperationException();
1085         }
1086
1087         @Override
1088         public <T> void addMetadata(Metadata data) throws ServiceException {
1089             writeSupport.addMetadata(data);
1090         }
1091
1092         @Override
1093         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1094             return writeSupport.getMetadata(clazz);
1095         }
1096
1097         @Override
1098         public TreeMap<String, byte[]> getMetadata() {
1099             return writeSupport.getMetadata();
1100         }
1101
1102         @Override
1103         public void commitDone(WriteTraits writeTraits, long csid) {
1104             writeSupport.commitDone(writeTraits, csid);
1105         }
1106
1107         @Override
1108         public void clearUndoList(WriteTraits writeTraits) {
1109             writeSupport.clearUndoList(writeTraits);
1110         }
1111         @Override
1112         public int clearMetadata() {
1113             return writeSupport.clearMetadata();
1114         }
1115
1116                 @Override
1117                 public void startUndo() {
1118                         writeSupport.startUndo();
1119                 }
1120     }
1121
1122     class VirtualWriteOnlySupport implements WriteSupport {
1123
1124 //        @Override
1125 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1126 //                Resource object) {
1127 //            throw new UnsupportedOperationException();
1128 //        }
1129
1130         @Override
1131         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1132
1133             TransientGraph impl = (TransientGraph)provider;
1134             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1135             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1136             clientChanges.claim(subject, predicate, object);
1137
1138         }
1139
1140         @Override
1141         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1142
1143             TransientGraph impl = (TransientGraph)provider;
1144             impl.claim(subject, predicate, object);
1145             getQueryProvider2().updateStatements(subject, predicate);
1146             clientChanges.claim(subject, predicate, object);
1147
1148         }
1149
1150         @Override
1151         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1152             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1153         }
1154
1155         @Override
1156         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1157             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1158             getQueryProvider2().updateValue(resource);
1159             clientChanges.claimValue(resource);
1160         }
1161
1162         @Override
1163         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1164             byte[] value = reader.readBytes(null, amount);
1165             claimValue(provider, resource, value);
1166         }
1167
1168         @Override
1169         public Resource createResource(VirtualGraph provider) {
1170             TransientGraph impl = (TransientGraph)provider;
1171             return impl.getResource(impl.newResource(false));
1172         }
1173
1174         @Override
1175         public Resource createResource(VirtualGraph provider, long clusterId) {
1176             throw new UnsupportedOperationException();
1177         }
1178
1179         @Override
1180         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1181         throws DatabaseException {
1182             throw new UnsupportedOperationException();
1183         }
1184
1185         @Override
1186         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1187         throws DatabaseException {
1188             throw new UnsupportedOperationException();
1189         }
1190
1191         @Override
1192         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1193         throws ServiceException {
1194             throw new UnsupportedOperationException();
1195         }
1196
1197         @Override
1198         public Resource setDefaultClusterSet(Resource clusterSet)
1199         throws ServiceException {
1200                 return null;
1201         }
1202
1203         @Override
1204         public void denyValue(VirtualGraph provider, Resource resource) {
1205             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1206             getQueryProvider2().updateValue(querySupport.getId(resource));
1207             // NOTE: this only keeps track of value changes by-resource.
1208             clientChanges.claimValue(resource);
1209         }
1210
1211         @Override
1212         public void flush(boolean intermediate) {
1213             throw new UnsupportedOperationException();
1214         }
1215
1216         @Override
1217         public void flushCluster() {
1218             throw new UnsupportedOperationException();
1219         }
1220
1221         @Override
1222         public void flushCluster(Resource r) {
1223             throw new UnsupportedOperationException("Resource " + r);
1224         }
1225
1226         @Override
1227         public void gc() {
1228         }
1229
1230         @Override
1231         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1232             TransientGraph impl = (TransientGraph) provider;
1233             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1234             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1235             clientChanges.deny(subject, predicate, object);
1236             return true;
1237         }
1238
1239         @Override
1240         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1241             throw new UnsupportedOperationException();
1242         }
1243
1244         @Override
1245         public boolean writeOnly() {
1246             return true;
1247         }
1248
1249         @Override
1250         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1251             throw new UnsupportedOperationException();
1252         }
1253
1254         @Override
1255         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1256             throw new UnsupportedOperationException();
1257         }
1258
1259         @Override
1260         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1261             throw new UnsupportedOperationException();
1262         }
1263
1264         @Override
1265         public <T> void addMetadata(Metadata data) throws ServiceException {
1266             throw new UnsupportedOperationException();
1267         }
1268
1269         @Override
1270         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1271             throw new UnsupportedOperationException();
1272         }
1273
1274         @Override
1275         public TreeMap<String, byte[]> getMetadata() {
1276             throw new UnsupportedOperationException();
1277         }
1278
1279         @Override
1280         public void commitDone(WriteTraits writeTraits, long csid) {
1281         }
1282
1283         @Override
1284         public void clearUndoList(WriteTraits writeTraits) {
1285         }
1286
1287         @Override
1288         public int clearMetadata() {
1289             return 0;
1290         }
1291
1292                 @Override
1293                 public void startUndo() {
1294                 }
1295     }
1296
1297     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1298
1299         try {
1300
1301             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1302
1303             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1304
1305             flushCounter = 0;
1306             Disposable.safeDispose(clientChanges);
1307             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1308
1309             acquireWriteOnly();
1310
1311             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1312
1313             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1314
1315             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1316             writeState = writeStateT;
1317
1318             assert (null != writer);
1319 //            writer.state.barrier.inc();
1320             long start = System.nanoTime();
1321             T result = request.perform(writer);
1322             long duration = System.nanoTime() - start;
1323             if (DEBUG) {
1324                 System.err.println("################");
1325                 System.err.println("WriteOnly duration " + 1e-9*duration);
1326             }
1327             writeStateT.setResult(result);
1328
1329             // This makes clusters available from server
1330             clusterStream.reallyFlush();
1331
1332             // This will trigger query updates
1333             releaseWriteOnly(writer);
1334             assert (null != writer);
1335
1336             handleUpdatesAndMetadata(writer);
1337
1338         } catch (CancelTransactionException e) {
1339
1340             releaseWriteOnly(writeState.getGraph());
1341
1342             clusterTable.removeWriteOnlyClusters();
1343             state.stopWriteTransaction(clusterStream);
1344
1345         } catch (Throwable e) {
1346
1347             e.printStackTrace();
1348
1349             releaseWriteOnly(writeState.getGraph());
1350
1351             clusterTable.removeWriteOnlyClusters();
1352
1353             if (callback != null)
1354                 callback.exception(new DatabaseException(e));
1355
1356             state.stopWriteTransaction(clusterStream);
1357             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1358
1359         } finally  {
1360
1361             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1362
1363         }
1364
1365     }
1366
1367     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1368         scheduleRequest(request, callback, notify, null);
1369     }
1370
1371     @Override
1372     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1373
1374         assertAlive();
1375
1376         assert (request != null);
1377
1378         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1379
1380         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1381
1382             @Override
1383             public void run(int thread) {
1384
1385                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1386
1387                     try {
1388
1389                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1390
1391                         flushCounter = 0;
1392                         Disposable.safeDispose(clientChanges);
1393                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1394
1395                         acquireWriteOnly();
1396
1397                         VirtualGraph vg = getProvider(request.getProvider());
1398                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1399
1400                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1401
1402                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1403
1404                             @Override
1405                             public void execute(Object result) {
1406                                 if(callback != null) callback.accept(null);
1407                             }
1408
1409                             @Override
1410                             public void exception(Throwable t) {
1411                                 if(callback != null) callback.accept((DatabaseException)t);
1412                             }
1413
1414                         });
1415
1416                         assert (null != writer);
1417 //                        writer.state.barrier.inc();
1418
1419                         try {
1420
1421                             request.perform(writer);
1422
1423                         } catch (Throwable e) {
1424
1425 //                            writer.state.barrier.dec();
1426 //                            writer.waitAsync(null);
1427
1428                             releaseWriteOnly(writer);
1429
1430                             clusterTable.removeWriteOnlyClusters();
1431
1432                             if(!(e instanceof CancelTransactionException)) {
1433                                 if (callback != null)
1434                                     callback.accept(new DatabaseException(e));
1435                             }
1436
1437                             writeState.except(e);
1438
1439                             return;
1440
1441                         }
1442
1443                         // This makes clusters available from server
1444                         boolean empty = clusterStream.reallyFlush();
1445                         // This was needed to make WO requests call metadata listeners.
1446                         // NOTE: the calling event does not contain clientChanges information.
1447                         if (!empty && clientChanges.isEmpty())
1448                             clientChanges.setNotEmpty(true);
1449                         releaseWriteOnly(writer);
1450                         assert (null != writer);
1451                     } finally  {
1452
1453                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1454
1455                     }
1456
1457
1458                 task.finish();
1459
1460             }
1461
1462         }, combine);
1463
1464     }
1465
1466     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1467         scheduleRequest(request, callback, notify, null);
1468     }
1469
1470     /* (non-Javadoc)
1471      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1472      */
1473     @Override
1474     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1475
1476         assert (request != null);
1477
1478         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1479
1480         requestManager.scheduleWrite(new SessionTask(request, thread) {
1481
1482             @Override
1483             public void run(int thread) {
1484
1485                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1486
1487                 performWriteOnly(request, notify, callback);
1488
1489                 task.finish();
1490
1491             }
1492
1493         }, combine);
1494
1495     }
1496
1497     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1498
1499         assert (request != null);
1500         assert (procedure != null);
1501
1502         //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1503
1504         requestManager.scheduleRead(new SessionRead(request, throwable, notify, queryProvider2.THREAD_MASK + 1, -1) {
1505
1506             @Override
1507             public void run(int thread) {
1508
1509                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1510
1511                 ListenerBase listener = getListenerBase(procedure);
1512
1513                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1514
1515                 try {
1516
1517                     if (listener != null) {
1518
1519                         try {
1520                                 
1521                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1522
1523                                         @Override
1524                                         public void exception(AsyncReadGraph graph, Throwable t) {
1525                                                 procedure.exception(graph, t);
1526                                                 if(throwable != null) {
1527                                                         throwable.set(t);
1528                                                 } else {
1529                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1530                                                 }
1531                                         }
1532
1533                                         @Override
1534                                         public void execute(AsyncReadGraph graph, T t) {
1535                                                 if(result != null) result.set(t);
1536                                                 procedure.execute(graph, t);
1537                                         }
1538
1539                                 };
1540                                 
1541                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap, true);
1542                                 
1543                         } catch (Throwable t) {
1544                             // This is handled by the AsyncProcedure
1545                                 //Logger.defaultLogError("Internal error", t);
1546                         }
1547
1548                     } else {
1549
1550                         try {
1551
1552 //                            newGraph.state.barrier.inc();
1553
1554                             T t = request.perform(newGraph);
1555
1556                             try {
1557
1558                                 if(result != null) result.set(t);
1559                                 procedure.execute(newGraph, t);
1560
1561                             } catch (Throwable th) {
1562
1563                                 if(throwable != null) {
1564                                     throwable.set(th);
1565                                 } else {
1566                                     Logger.defaultLogError("Unhandled exception", th);
1567                                 }
1568
1569                             }
1570
1571                         } catch (Throwable t) {
1572
1573                             if (DEBUG)
1574                                 t.printStackTrace();
1575
1576                             if(throwable != null) {
1577                                 throwable.set(t);
1578                             } else {
1579                                 Logger.defaultLogError("Unhandled exception", t);
1580                             }
1581
1582                             try {
1583
1584                                 procedure.exception(newGraph, t);
1585
1586                             } catch (Throwable t2) {
1587
1588                                 if(throwable != null) {
1589                                     throwable.set(t2);
1590                                 } else {
1591                                     Logger.defaultLogError("Unhandled exception", t2);
1592                                 }
1593
1594                             }
1595
1596                         }
1597
1598 //                        newGraph.state.barrier.dec();
1599 //                        newGraph.waitAsync(request);
1600
1601                     }
1602
1603                 } finally {
1604
1605                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1606
1607                 }
1608
1609             }
1610
1611         });
1612
1613     }
1614
1615     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1616
1617         assert (request != null);
1618         assert (procedure != null);
1619
1620         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1621
1622         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1623
1624             @Override
1625             public void run(int thread) {
1626
1627                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1628
1629                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1630
1631                 try {
1632
1633                     if (listener != null) {
1634
1635                         try {
1636                                 QueryCacheBase.resultAsyncReadEntry(newGraph, request, null, listener, procedure);
1637                                 //QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure, true);
1638                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1639                                                 } catch (DatabaseException e) {
1640                                                         Logger.defaultLogError(e);
1641                                                 }
1642
1643                     } else {
1644
1645 //                        final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1646 //                                procedure, "request");
1647
1648                         BlockingAsyncProcedure<T> wrap = new BlockingAsyncProcedure<T>(procedure, request);
1649
1650                         try {
1651
1652                             request.perform(newGraph, wrap);
1653                             wrap.get();
1654
1655                         } catch (Throwable t) {
1656
1657                                 wrap.exception(newGraph, t);
1658                                 
1659 //                            wrapper.exception(newGraph, t);
1660 //                            newGraph.waitAsync(request);
1661
1662
1663                         }
1664
1665                     }
1666
1667                 } finally {
1668
1669                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1670
1671                 }
1672
1673             }
1674
1675         });
1676
1677     }
1678
1679     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1680
1681         assert (request != null);
1682         assert (procedure != null);
1683
1684         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1685
1686         int sync = notify != null ? thread : -1;
1687
1688         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1689
1690             @Override
1691             public void run(int thread) {
1692
1693                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1694
1695                 ListenerBase listener = getListenerBase(procedure);
1696
1697                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1698
1699                 try {
1700
1701                     if (listener != null) {
1702
1703                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1704
1705 //                        newGraph.waitAsync(request);
1706
1707                     } else {
1708
1709                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1710
1711                         try {
1712
1713                             request.perform(newGraph, wrapper);
1714
1715                         } catch (Throwable t) {
1716
1717                             t.printStackTrace();
1718
1719                         }
1720
1721                     }
1722
1723                 } finally {
1724
1725                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1726
1727                 }
1728
1729             }
1730
1731         });
1732
1733     }
1734
1735     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1736
1737         assert (request != null);
1738         assert (procedure != null);
1739
1740         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1741
1742         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1743
1744             @Override
1745             public void run(int thread) {
1746
1747                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1748
1749                 ListenerBase listener = getListenerBase(procedure);
1750
1751                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1752
1753                 try {
1754
1755                     if (listener != null) {
1756
1757                         try {
1758                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1759                         } catch (DatabaseException e) {
1760                             Logger.defaultLogError(e);
1761                         }
1762                         
1763                         
1764 //                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1765 //
1766 //                            @Override
1767 //                            public void exception(Throwable t) {
1768 //                                procedure.exception(t);
1769 //                                if(throwable != null) {
1770 //                                    throwable.set(t);
1771 //                                }
1772 //                            }
1773 //
1774 //                            @Override
1775 //                            public void execute(T t) {
1776 //                                if(result != null) result.set(t);
1777 //                                procedure.execute(t);
1778 //                            }
1779 //
1780 //                        }, listener);
1781
1782 //                        newGraph.waitAsync(request);
1783
1784                     } else {
1785
1786 //                        newGraph.state.barrier.inc();
1787
1788                         request.register(newGraph, new Listener<T>() {
1789
1790                             @Override
1791                             public void exception(Throwable t) {
1792                                 if(throwable != null) throwable.set(t);
1793                                 procedure.exception(t);
1794 //                                newGraph.state.barrier.dec();
1795                             }
1796
1797                             @Override
1798                             public void execute(T t) {
1799                                 if(result != null) result.set(t);
1800                                 procedure.execute(t);
1801 //                                newGraph.state.barrier.dec();
1802                             }
1803
1804                             @Override
1805                             public boolean isDisposed() {
1806                                 return true;
1807                             }
1808
1809                         });
1810
1811 //                        newGraph.waitAsync(request);
1812
1813                     }
1814
1815                 } finally {
1816
1817                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1818
1819                 }
1820
1821             }
1822
1823         });
1824
1825     }
1826
1827
1828     @Override
1829     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1830
1831         scheduleRequest(request, procedure, null, null, null);
1832
1833     }
1834
1835     @Override
1836     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1837         assertNotSession();
1838         Semaphore notify = new Semaphore(0);
1839         DataContainer<Throwable> container = new DataContainer<Throwable>();
1840         DataContainer<T> result = new DataContainer<T>();
1841         scheduleRequest(request, procedure, notify, container, result);
1842         acquire(notify, request);
1843         Throwable throwable = container.get();
1844         if(throwable != null) {
1845             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1846             else throw new DatabaseException("Unexpected exception", throwable);
1847         }
1848         return result.get();
1849     }
1850
1851     @Override
1852     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1853         assertNotSession();
1854         Semaphore notify = new Semaphore(0);
1855         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1856         final DataContainer<T> resultContainer = new DataContainer<T>();
1857         scheduleRequest(request, new AsyncProcedure<T>() {
1858             @Override
1859             public void exception(AsyncReadGraph graph, Throwable throwable) {
1860                 exceptionContainer.set(throwable);
1861                 procedure.exception(graph, throwable);
1862             }
1863             @Override
1864             public void execute(AsyncReadGraph graph, T result) {
1865                 resultContainer.set(result);
1866                 procedure.execute(graph, result);
1867             }
1868         }, getListenerBase(procedure), notify);
1869         acquire(notify, request, procedure);
1870         Throwable throwable = exceptionContainer.get();
1871         if (throwable != null) {
1872             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1873             else throw new DatabaseException("Unexpected exception", throwable);
1874         }
1875         return resultContainer.get();
1876     }
1877
1878
1879     @Override
1880     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1881         assertNotSession();
1882         Semaphore notify = new Semaphore(0);
1883         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1884         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1885             @Override
1886             public void exception(AsyncReadGraph graph, Throwable throwable) {
1887                 exceptionContainer.set(throwable);
1888                 procedure.exception(graph, throwable);
1889             }
1890             @Override
1891             public void execute(AsyncReadGraph graph, T result) {
1892                 procedure.execute(graph, result);
1893             }
1894             @Override
1895             public void finished(AsyncReadGraph graph) {
1896                 procedure.finished(graph);
1897             }
1898         }, notify);
1899         acquire(notify, request, procedure);
1900         Throwable throwable = exceptionContainer.get();
1901         if (throwable != null) {
1902             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1903             else throw new DatabaseException("Unexpected exception", throwable);
1904         }
1905         // TODO: implement return value
1906         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1907         return null;
1908     }
1909
1910     @Override
1911     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1912         return syncRequest(request, new ProcedureAdapter<T>());
1913
1914
1915 //        assert(request != null);
1916 //
1917 //        final DataContainer<T> result = new DataContainer<T>();
1918 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1919 //
1920 //        syncRequest(request, new Procedure<T>() {
1921 //
1922 //            @Override
1923 //            public void execute(T t) {
1924 //                result.set(t);
1925 //            }
1926 //
1927 //            @Override
1928 //            public void exception(Throwable t) {
1929 //                exception.set(t);
1930 //            }
1931 //
1932 //        });
1933 //
1934 //        Throwable t = exception.get();
1935 //        if(t != null) {
1936 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1937 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1938 //        }
1939 //
1940 //        return result.get();
1941
1942     }
1943
1944     @Override
1945     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1946         return syncRequest(request, (Procedure<T>)procedure);
1947     }
1948
1949
1950 //    @Override
1951 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1952 //        assertNotSession();
1953 //        Semaphore notify = new Semaphore(0);
1954 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1955 //        DataContainer<T> result = new DataContainer<T>();
1956 //        scheduleRequest(request, procedure, notify, container, result);
1957 //        acquire(notify, request);
1958 //        Throwable throwable = container.get();
1959 //        if(throwable != null) {
1960 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1961 //            else throw new DatabaseException("Unexpected exception", throwable);
1962 //        }
1963 //        return result.get();
1964 //    }
1965
1966
1967     @Override
1968     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1969         assertNotSession();
1970         Semaphore notify = new Semaphore(0);
1971         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1972         final DataContainer<T> result = new DataContainer<T>();
1973         scheduleRequest(request, procedure, notify, container, result);
1974         acquire(notify, request);
1975         Throwable throwable = container.get();
1976         if (throwable != null) {
1977             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1978             else throw new DatabaseException("Unexpected exception", throwable);
1979         }
1980         return result.get();
1981     }
1982
1983     @Override
1984     public void syncRequest(Write request) throws DatabaseException  {
1985         assertNotSession();
1986         assertAlive();
1987         Semaphore notify = new Semaphore(0);
1988         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1989         scheduleRequest(request, e -> exception.set(e), notify);
1990         acquire(notify, request);
1991         if(exception.get() != null) throw exception.get();
1992     }
1993
1994     @Override
1995     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1996         assertNotSession();
1997         Semaphore notify = new Semaphore(0);
1998         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1999         final DataContainer<T> result = new DataContainer<T>();
2000         scheduleRequest(request, new Procedure<T>() {
2001
2002             @Override
2003             public void exception(Throwable t) {
2004               exception.set(t);
2005             }
2006
2007             @Override
2008             public void execute(T t) {
2009               result.set(t);
2010             }
2011
2012         }, notify);
2013         acquire(notify, request);
2014         if(exception.get() != null) {
2015             Throwable t = exception.get();
2016             if(t instanceof DatabaseException) throw (DatabaseException)t;
2017             else throw new DatabaseException(t);
2018         }
2019         return result.get();
2020     }
2021
2022     @Override
2023     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2024         assertNotSession();
2025         Semaphore notify = new Semaphore(0);
2026         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2027         final DataContainer<T> result = new DataContainer<T>();
2028         scheduleRequest(request, new Procedure<T>() {
2029
2030             @Override
2031             public void exception(Throwable t) {
2032               exception.set(t);
2033             }
2034
2035             @Override
2036             public void execute(T t) {
2037               result.set(t);
2038             }
2039
2040         }, notify);
2041         acquire(notify, request);
2042         if(exception.get() != null) {
2043             Throwable t = exception.get();
2044             if(t instanceof DatabaseException) throw (DatabaseException)t;
2045             else throw new DatabaseException(t);
2046         }
2047         return result.get();
2048     }
2049
2050     @Override
2051     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2052         assertNotSession();
2053         Semaphore notify = new Semaphore(0);
2054         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2055         final DataContainer<T> result = new DataContainer<T>();
2056         scheduleRequest(request, new Procedure<T>() {
2057
2058             @Override
2059             public void exception(Throwable t) {
2060               exception.set(t);
2061             }
2062
2063             @Override
2064             public void execute(T t) {
2065               result.set(t);
2066             }
2067
2068         }, notify);
2069         acquire(notify, request);
2070         if(exception.get() != null) {
2071             Throwable t = exception.get();
2072             if(t instanceof DatabaseException) throw (DatabaseException)t;
2073             else throw new DatabaseException(t);
2074         }
2075         return result.get();
2076     }
2077
2078     @Override
2079     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2080         assertNotSession();
2081         Semaphore notify = new Semaphore(0);
2082         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2083         scheduleRequest(request, e -> exception.set(e), notify);
2084         acquire(notify, request);
2085         if(exception.get() != null) throw exception.get();
2086     }
2087
2088     @Override
2089     public void syncRequest(WriteOnly request) throws DatabaseException  {
2090         assertNotSession();
2091         assertAlive();
2092         Semaphore notify = new Semaphore(0);
2093         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2094         scheduleRequest(request, e -> exception.set(e), notify);
2095         acquire(notify, request);
2096         if(exception.get() != null) throw exception.get();
2097     }
2098
2099     /*
2100      *
2101      * GraphRequestProcessor interface
2102      */
2103
2104     @Override
2105     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2106
2107         scheduleRequest(request, procedure, null, null);
2108
2109     }
2110
2111     @Override
2112     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2113
2114         scheduleRequest(request, procedure, null);
2115
2116     }
2117
2118     @Override
2119     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2120
2121         scheduleRequest(request, procedure, null, null, null);
2122
2123     }
2124
2125     @Override
2126     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2127
2128         scheduleRequest(request, callback, null);
2129
2130     }
2131
2132     @Override
2133     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2134
2135         scheduleRequest(request, procedure, null);
2136
2137     }
2138
2139     @Override
2140     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2141
2142         scheduleRequest(request, procedure, null);
2143
2144     }
2145
2146     @Override
2147     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2148
2149         scheduleRequest(request, procedure, null);
2150
2151     }
2152
2153     @Override
2154     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2155
2156         scheduleRequest(request, callback, null);
2157
2158     }
2159
2160     @Override
2161     public void asyncRequest(final Write r) {
2162         asyncRequest(r, null);
2163     }
2164
2165     @Override
2166     public void asyncRequest(final DelayedWrite r) {
2167         asyncRequest(r, null);
2168     }
2169
2170     @Override
2171     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2172
2173         scheduleRequest(request, callback, null);
2174
2175     }
2176
2177     @Override
2178     public void asyncRequest(final WriteOnly request) {
2179
2180         asyncRequest(request, null);
2181
2182     }
2183
2184     @Override
2185     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2186         r.request(this, procedure);
2187     }
2188
2189     @Override
2190     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2191         r.request(this, procedure);
2192     }
2193
2194     @Override
2195     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2196         r.request(this, procedure);
2197     }
2198
2199     @Override
2200     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2201         r.request(this, procedure);
2202     }
2203
2204     @Override
2205     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2206         r.request(this, procedure);
2207     }
2208
2209     @Override
2210     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2211         r.request(this, procedure);
2212     }
2213
2214     @Override
2215     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2216         return r.request(this);
2217     }
2218
2219     @Override
2220     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2221         return r.request(this);
2222     }
2223
2224     @Override
2225     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2226         r.request(this, procedure);
2227     }
2228
2229     @Override
2230     public <T> void async(WriteInterface<T> r) {
2231         r.request(this, new ProcedureAdapter<T>());
2232     }
2233
2234     //@Override
2235     public void incAsync() {
2236         state.incAsync();
2237     }
2238
2239     //@Override
2240     public void decAsync() {
2241         state.decAsync();
2242     }
2243
2244     public long getCluster(ResourceImpl resource) {
2245         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2246         return cluster.getClusterId();
2247     }
2248
2249     public long getCluster(int id) {
2250         if (clusterTable == null)
2251             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2252         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2253     }
2254
2255     public ResourceImpl getResource(int id) {
2256         return new ResourceImpl(resourceSupport, id);
2257     }
2258
2259     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2260         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2261         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2262         int key = proxy.getClusterKey();
2263         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2264         return new ResourceImpl(resourceSupport, resourceKey);
2265     }
2266
2267     public ResourceImpl getResource2(int id) {
2268         assert (id != 0);
2269         return new ResourceImpl(resourceSupport, id);
2270     }
2271
2272     final public int getId(ResourceImpl impl) {
2273         return impl.id;
2274     }
2275
2276     public static final Charset UTF8 = Charset.forName("utf-8");
2277
2278
2279
2280
2281     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2282         for(TransientGraph g : support.providers) {
2283             if(g.isPending(subject)) return false;
2284         }
2285         return true;
2286     }
2287
2288     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2289         for(TransientGraph g : support.providers) {
2290             if(g.isPending(subject, predicate)) return false;
2291         }
2292         return true;
2293     }
2294
2295     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2296
2297         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2298
2299             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2300
2301             @Override
2302             public void accept(ReadGraphImpl graph) {
2303                 if(ready.decrementAndGet() == 0) {
2304                     runnable.accept(graph);
2305                 }
2306             }
2307
2308         };
2309
2310         for(TransientGraph g : support.providers) {
2311             if(g.isPending(subject)) {
2312                 try {
2313                     g.load(graph, subject, composite);
2314                 } catch (DatabaseException e) {
2315                     e.printStackTrace();
2316                 }
2317             } else {
2318                 composite.accept(graph);
2319             }
2320         }
2321
2322         composite.accept(graph);
2323
2324     }
2325
2326     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2327
2328         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2329
2330             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2331
2332             @Override
2333             public void accept(ReadGraphImpl graph) {
2334                 if(ready.decrementAndGet() == 0) {
2335                     runnable.accept(graph);
2336                 }
2337             }
2338
2339         };
2340
2341         for(TransientGraph g : support.providers) {
2342             if(g.isPending(subject, predicate)) {
2343                 try {
2344                     g.load(graph, subject, predicate, composite);
2345                 } catch (DatabaseException e) {
2346                     e.printStackTrace();
2347                 }
2348             } else {
2349                 composite.accept(graph);
2350             }
2351         }
2352
2353         composite.accept(graph);
2354
2355     }
2356
2357 //    void dumpHeap() {
2358 //
2359 //        try {
2360 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2361 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2362 //                    "d:/heap" + flushCounter + ".txt", true);
2363 //        } catch (IOException e) {
2364 //            e.printStackTrace();
2365 //        }
2366 //
2367 //    }
2368
2369     void fireReactionsToSynchronize(ChangeSet cs) {
2370
2371         // Do not fire empty events
2372         if (cs.isEmpty())
2373             return;
2374
2375         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2376 //        g.state.barrier.inc();
2377
2378         try {
2379
2380             if (!cs.isEmpty()) {
2381                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2382                 for (ChangeListener l : changeListeners2) {
2383                     try {
2384                         l.graphChanged(e2);
2385                     } catch (Exception ex) {
2386                         ex.printStackTrace();
2387                     }
2388                 }
2389             }
2390
2391         } finally {
2392
2393 //            g.state.barrier.dec();
2394 //            g.waitAsync(null);
2395
2396         }
2397
2398     }
2399
2400     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2401         try {
2402
2403             // Do not fire empty events
2404             if (cs2.isEmpty())
2405                 return;
2406
2407 //            graph.restart();
2408 //            graph.state.barrier.inc();
2409
2410             try {
2411
2412                 if (!cs2.isEmpty()) {
2413
2414                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2415                     for (ChangeListener l : changeListeners2) {
2416                         try {
2417                             l.graphChanged(e2);
2418 //                            System.out.println("changelistener " + l);
2419                         } catch (Exception ex) {
2420                             ex.printStackTrace();
2421                         }
2422                     }
2423
2424                 }
2425
2426             } finally {
2427
2428 //                graph.state.barrier.dec();
2429 //                graph.waitAsync(null);
2430
2431             }
2432         } catch (Throwable t) {
2433             t.printStackTrace();
2434
2435         }
2436     }
2437     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2438
2439         try {
2440
2441             // Do not fire empty events
2442             if (cs2.isEmpty())
2443                 return;
2444
2445             // Do not fire on virtual requests
2446             if(graph.getProvider() != null)
2447                 return;
2448
2449             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2450
2451             try {
2452
2453                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2454                 for (ChangeListener l : metadataListeners) {
2455                     try {
2456                         l.graphChanged(e2);
2457                     } catch (Throwable ex) {
2458                         ex.printStackTrace();
2459                     }
2460                 }
2461
2462             } finally {
2463
2464             }
2465
2466         } catch (Throwable t) {
2467             t.printStackTrace();
2468         }
2469
2470     }
2471
2472
2473     /*
2474      * (non-Javadoc)
2475      *
2476      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2477      */
2478     @Override
2479     public <T> T getService(Class<T> api) {
2480         T t = peekService(api);
2481         if (t == null) {
2482             if (state.isClosed())
2483                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2484             throw new ServiceNotFoundException(this, api);
2485         }
2486         return t;
2487     }
2488
2489     /**
2490      * @return
2491      */
2492     protected abstract ServerInformation getCachedServerInformation();
2493
2494         private Class<?> serviceKey1 = null;
2495         private Class<?> serviceKey2 = null;
2496         private Object service1 = null;
2497         private Object service2 = null;
2498
2499     /*
2500      * (non-Javadoc)
2501      *
2502      * @see
2503      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2504      */
2505     @SuppressWarnings("unchecked")
2506     @Override
2507     public synchronized <T> T peekService(Class<T> api) {
2508
2509                 if(serviceKey1 == api) {
2510                         return (T)service1;
2511                 } else if (serviceKey2 == api) {
2512                         // Promote this key
2513                         Object result = service2;
2514                         service2 = service1;
2515                         serviceKey2 = serviceKey1;
2516                         service1 = result;
2517                         serviceKey1 = api;
2518                         return (T)result;
2519                 }
2520
2521         if (Layer0.class == api)
2522                 return (T) L0;
2523         if (ServerInformation.class == api)
2524             return (T) getCachedServerInformation();
2525         else if (WriteGraphImpl.class == api)
2526             return (T) writeState.getGraph();
2527         else if (ClusterBuilder.class == api)
2528             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2529         else if (ClusterBuilderFactory.class == api)
2530             return (T)new ClusterBuilderFactoryImpl(this);
2531
2532                 service2 = service1;
2533                 serviceKey2 = serviceKey1;
2534
2535         service1 = serviceLocator.peekService(api);
2536         serviceKey1 = api;
2537
2538         return (T)service1;
2539
2540     }
2541
2542     /*
2543      * (non-Javadoc)
2544      *
2545      * @see
2546      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2547      */
2548     @Override
2549     public boolean hasService(Class<?> api) {
2550         return serviceLocator.hasService(api);
2551     }
2552
2553     /**
2554      * @param api the api that must be implemented by the specified service
2555      * @param service the service implementation
2556      */
2557     @Override
2558     public <T> void registerService(Class<T> api, T service) {
2559         if(Layer0.class == api) {
2560                 L0 = (Layer0)service;
2561                 return;
2562         }
2563         serviceLocator.registerService(api, service);
2564         if (TransactionPolicySupport.class == api) {
2565             transactionPolicy = (TransactionPolicySupport)service;
2566             state.resetTransactionPolicy();
2567         }
2568         if (api == serviceKey1)
2569                 service1 = service;
2570         else if (api == serviceKey2)
2571                 service2 = service;
2572     }
2573
2574     // ----------------
2575     // SessionMonitor
2576     // ----------------
2577
2578     void fireSessionVariableChange(String variable) {
2579         for (MonitorHandler h : monitorHandlers) {
2580             MonitorContext ctx = monitorContexts.getLeft(h);
2581             assert ctx != null;
2582             // SafeRunner functionality repeated here to avoid dependency.
2583             try {
2584                 h.valuesChanged(ctx);
2585             } catch (Exception e) {
2586                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2587             } catch (LinkageError e) {
2588                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2589             }
2590         }
2591     }
2592
2593
2594     class ResourceSerializerImpl implements ResourceSerializer {
2595
2596         public long createRandomAccessId(int id) throws DatabaseException {
2597             if(id < 0)
2598                 return id;
2599             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2600             long cluster = getCluster(id);
2601             if (0 == cluster)
2602                 return 0; // Better to return 0 then invalid id.
2603             long result = ClusterTraitsBase.createResourceId(cluster, index);
2604             return result;
2605         }
2606
2607         @Override
2608         public long getRandomAccessId(Resource resource) throws DatabaseException {
2609
2610             ResourceImpl resourceImpl = (ResourceImpl) resource;
2611             return createRandomAccessId(resourceImpl.id);
2612
2613         }
2614
2615         @Override
2616         public int getTransientId(Resource resource) throws DatabaseException {
2617
2618             ResourceImpl resourceImpl = (ResourceImpl) resource;
2619             return resourceImpl.id;
2620
2621         }
2622
2623         @Override
2624         public String createRandomAccessId(Resource resource)
2625         throws InvalidResourceReferenceException {
2626
2627             if(resource == null) throw new IllegalArgumentException();
2628
2629             ResourceImpl resourceImpl = (ResourceImpl) resource;
2630
2631             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2632
2633             int r;
2634             try {
2635                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2636             } catch (DatabaseException e1) {
2637                 throw new InvalidResourceReferenceException(e1);
2638             }
2639             try {
2640                 // Serialize as '<resource index>_<cluster id>'
2641                 return "" + r + "_" + getCluster(resourceImpl);
2642             } catch (Throwable e) {
2643                 e.printStackTrace();
2644                 throw new InvalidResourceReferenceException(e);
2645             } finally {
2646             }
2647         }
2648
2649         public int getTransientId(long serialized) throws DatabaseException {
2650             if (serialized <= 0)
2651                 return (int)serialized;
2652             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2653             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2654             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2655             if (cluster == null)
2656                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2657             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2658             return key;
2659         }
2660
2661         @Override
2662         public Resource getResource(long randomAccessId) throws DatabaseException {
2663             return getResourceByKey(getTransientId(randomAccessId));
2664         }
2665
2666         @Override
2667         public Resource getResource(int transientId) throws DatabaseException {
2668             return getResourceByKey(transientId);
2669         }
2670
2671         @Override
2672         public Resource getResource(String randomAccessId)
2673         throws InvalidResourceReferenceException {
2674             try {
2675                 int i = randomAccessId.indexOf('_');
2676                 if (i == -1)
2677                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2678                             + randomAccessId + "'");
2679                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2680                 if(r < 0) return getResourceByKey(r);
2681                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2682                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2683                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2684                 if (cluster.hasResource(key, clusterTranslator))
2685                     return getResourceByKey(key);
2686             } catch (InvalidResourceReferenceException e) {
2687                 throw e;
2688             } catch (NumberFormatException e) {
2689                 throw new InvalidResourceReferenceException(e);
2690             } catch (Throwable e) {
2691                 e.printStackTrace();
2692                 throw new InvalidResourceReferenceException(e);
2693             } finally {
2694             }
2695             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2696         }
2697
2698         @Override
2699         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2700             try {
2701                 return true;
2702             } catch (Throwable e) {
2703                 e.printStackTrace();
2704                 throw new InvalidResourceReferenceException(e);
2705             } finally {
2706             }
2707         }
2708     }
2709
2710     // Copied from old SessionImpl
2711
2712     void check() {
2713         if (state.isClosed())
2714             throw new Error("Session closed.");
2715     }
2716
2717
2718
2719     public ResourceImpl getNewResource(long clusterId) {
2720         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2721         int newId;
2722         try {
2723             newId = cluster.createResource(clusterTranslator);
2724         } catch (DatabaseException e) {
2725             Logger.defaultLogError(e);
2726             return null;
2727         }
2728         return new ResourceImpl(resourceSupport, newId);
2729     }
2730
2731     public ResourceImpl getNewResource(Resource clusterSet)
2732     throws DatabaseException {
2733         long resourceId = clusterSet.getResourceId();
2734         Long clusterId = clusterSetsSupport.get(resourceId);
2735         if (null == clusterId)
2736             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2737         if (Constants.NewClusterId == clusterId) {
2738             clusterId = getService(ClusteringSupport.class).createCluster();
2739             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2740                 createdClusters.add(clusterId);
2741             clusterSetsSupport.put(resourceId, clusterId);
2742             return getNewResource(clusterId);
2743         } else {
2744             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2745             ResourceImpl result;
2746             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2747                 clusterId = getService(ClusteringSupport.class).createCluster();
2748                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2749                     createdClusters.add(clusterId);
2750                 clusterSetsSupport.put(resourceId, clusterId);
2751                 return getNewResource(clusterId);
2752             } else {
2753                 result = getNewResource(clusterId);
2754                 int resultKey = querySupport.getId(result);
2755                 long resultCluster = querySupport.getClusterId(resultKey);
2756                 if (clusterId != resultCluster)
2757                     clusterSetsSupport.put(resourceId, resultCluster);
2758                 return result;
2759             }
2760         }
2761     }
2762
2763     public void getNewClusterSet(Resource clusterSet)
2764     throws DatabaseException {
2765         if (DEBUG)
2766             System.out.println("new cluster set=" + clusterSet);
2767         long resourceId = clusterSet.getResourceId();
2768         if (clusterSetsSupport.containsKey(resourceId))
2769             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2770         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2771     }
2772     public boolean containsClusterSet(Resource clusterSet)
2773     throws ServiceException {
2774         long resourceId = clusterSet.getResourceId();
2775         return clusterSetsSupport.containsKey(resourceId);
2776     }
2777     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2778         Resource r = defaultClusterSet;
2779         defaultClusterSet = clusterSet;
2780         return r;
2781     }
2782     void printDiagnostics() {
2783
2784         if (DIAGNOSTICS) {
2785
2786             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2787             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2788             for(ClusterI cluster : clusterTable.getClusters()) {
2789                 try {
2790                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2791                         residentClusters.set(residentClusters.get() + 1);
2792                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2793                     }
2794                 } catch (DatabaseException e) {
2795                     Logger.defaultLogError(e);
2796                 }
2797             }
2798
2799             System.out.println("--------------------------------");
2800             System.out.println("Cluster information:");
2801             System.out.println("-amount of resident clusters=" + residentClusters.get());
2802             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2803
2804             for(ClusterI cluster : clusterTable.getClusters()) {
2805                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2806                 try {
2807                     if (!cluster.isLoaded())
2808                         System.out.println("not loaded]");
2809                     else if (cluster.isEmpty())
2810                         System.out.println("is empty]");
2811                     else {
2812                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2813                         System.out.print(",references=" + cluster.getReferenceCount());
2814                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2815                     }
2816                 } catch (DatabaseException e) {
2817                     Logger.defaultLogError(e);
2818                     System.out.println("is corrupted]");
2819                 }
2820             }
2821
2822             queryProvider2.printDiagnostics();
2823             System.out.println("--------------------------------");
2824         }
2825
2826     }
2827
2828     public void fireStartReadTransaction() {
2829
2830         if (DIAGNOSTICS)
2831             System.out.println("StartReadTransaction");
2832
2833         for (SessionEventListener listener : eventListeners) {
2834             try {
2835                 listener.readTransactionStarted();
2836             } catch (Throwable t) {
2837                 t.printStackTrace();
2838             }
2839         }
2840
2841     }
2842
2843     public void fireFinishReadTransaction() {
2844
2845         if (DIAGNOSTICS)
2846             System.out.println("FinishReadTransaction");
2847
2848         for (SessionEventListener listener : eventListeners) {
2849             try {
2850                 listener.readTransactionFinished();
2851             } catch (Throwable t) {
2852                 t.printStackTrace();
2853             }
2854         }
2855
2856     }
2857
2858     public void fireStartWriteTransaction() {
2859
2860         if (DIAGNOSTICS)
2861             System.out.println("StartWriteTransaction");
2862
2863         for (SessionEventListener listener : eventListeners) {
2864             try {
2865                 listener.writeTransactionStarted();
2866             } catch (Throwable t) {
2867                 t.printStackTrace();
2868             }
2869         }
2870
2871     }
2872
2873     public void fireFinishWriteTransaction() {
2874
2875         if (DIAGNOSTICS)
2876             System.out.println("FinishWriteTransaction");
2877
2878         for (SessionEventListener listener : eventListeners) {
2879             try {
2880                 listener.writeTransactionFinished();
2881             } catch (Throwable t) {
2882                 t.printStackTrace();
2883             }
2884         }
2885
2886         Indexing.resetDependenciesIndexingDisabled();
2887
2888     }
2889
2890     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2891         throw new Error("Not supported at the moment.");
2892     }
2893
2894     State getState() {
2895         return state;
2896     }
2897
2898     ClusterTable getClusterTable() {
2899         return clusterTable;
2900     }
2901
2902     public GraphSession getGraphSession() {
2903         return graphSession;
2904     }
2905
2906     public QueryProcessor getQueryProvider2() {
2907         return queryProvider2;
2908     }
2909
2910     ClientChangesImpl getClientChanges() {
2911         return clientChanges;
2912     }
2913
2914     boolean getWriteOnly() {
2915         return writeOnly;
2916     }
2917
2918     static int counter = 0;
2919
2920     public void onClusterLoaded(long clusterId) {
2921
2922         clusterTable.updateSize();
2923
2924     }
2925
2926
2927
2928
2929     /*
2930      * Implementation of the interface RequestProcessor
2931      */
2932
2933     @Override
2934     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2935
2936         assertNotSession();
2937         assertAlive();
2938
2939         assert(request != null);
2940
2941         final DataContainer<T> result = new DataContainer<T>();
2942         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2943
2944         syncRequest(request, new AsyncProcedure<T>() {
2945
2946             @Override
2947             public void execute(AsyncReadGraph graph, T t) {
2948                 result.set(t);
2949             }
2950
2951             @Override
2952             public void exception(AsyncReadGraph graph, Throwable t) {
2953                 exception.set(t);
2954             }
2955
2956         });
2957
2958         Throwable t = exception.get();
2959         if(t != null) {
2960             if(t instanceof DatabaseException) throw (DatabaseException)t;
2961             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2962         }
2963
2964         return result.get();
2965
2966     }
2967
2968 //    @Override
2969 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2970 //        assertNotSession();
2971 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2972 //    }
2973
2974     @Override
2975     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2976         assertNotSession();
2977         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2978     }
2979
2980     @Override
2981     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2982         assertNotSession();
2983         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2984     }
2985
2986     @Override
2987     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2988         assertNotSession();
2989         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2990     }
2991
2992     @Override
2993     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2994         assertNotSession();
2995         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2996     }
2997
2998     @Override
2999     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
3000
3001         assertNotSession();
3002
3003         assert(request != null);
3004
3005         final DataContainer<T> result = new DataContainer<T>();
3006         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3007
3008         syncRequest(request, new AsyncProcedure<T>() {
3009
3010             @Override
3011             public void execute(AsyncReadGraph graph, T t) {
3012                 result.set(t);
3013             }
3014
3015             @Override
3016             public void exception(AsyncReadGraph graph, Throwable t) {
3017                 exception.set(t);
3018             }
3019
3020         });
3021
3022         Throwable t = exception.get();
3023         if(t != null) {
3024             if(t instanceof DatabaseException) throw (DatabaseException)t;
3025             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3026         }
3027
3028         return result.get();
3029
3030     }
3031
3032     @Override
3033     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3034         assertNotSession();
3035         return syncRequest(request, (AsyncProcedure<T>)procedure);
3036     }
3037
3038     @Override
3039     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3040         assertNotSession();
3041         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3042     }
3043
3044     @Override
3045     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3046         assertNotSession();
3047         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3048     }
3049
3050     @Override
3051     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3052         assertNotSession();
3053         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3054     }
3055
3056     @Override
3057     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3058         assertNotSession();
3059         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3060     }
3061
3062     @Override
3063     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3064
3065         assertNotSession();
3066
3067         assert(request != null);
3068
3069         final ArrayList<T> result = new ArrayList<T>();
3070         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3071
3072         syncRequest(request, new AsyncMultiProcedure<T>() {
3073
3074             @Override
3075             public void execute(AsyncReadGraph graph, T t) {
3076                 synchronized(result) {
3077                     result.add(t);
3078                 }
3079             }
3080
3081             @Override
3082             public void finished(AsyncReadGraph graph) {
3083             }
3084
3085             @Override
3086             public void exception(AsyncReadGraph graph, Throwable t) {
3087                 exception.set(t);
3088             }
3089
3090
3091         });
3092
3093         Throwable t = exception.get();
3094         if(t != null) {
3095             if(t instanceof DatabaseException) throw (DatabaseException)t;
3096             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3097         }
3098
3099         return result;
3100
3101     }
3102
3103     @Override
3104     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3105         assertNotSession();
3106         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3107     }
3108
3109     @Override
3110     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3111         assertNotSession();
3112         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3113     }
3114
3115     @Override
3116     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3117         assertNotSession();
3118         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3119     }
3120
3121     @Override
3122     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3123         assertNotSession();
3124         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3125     }
3126
3127     @Override
3128     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3129         assertNotSession();
3130         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3131     }
3132
3133     @Override
3134     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3135
3136         assertNotSession();
3137
3138         assert(request != null);
3139
3140         final ArrayList<T> result = new ArrayList<T>();
3141         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3142
3143         syncRequest(request, new AsyncMultiProcedure<T>() {
3144
3145             @Override
3146             public void execute(AsyncReadGraph graph, T t) {
3147                 synchronized(result) {
3148                     result.add(t);
3149                 }
3150             }
3151
3152             @Override
3153             public void finished(AsyncReadGraph graph) {
3154             }
3155
3156             @Override
3157             public void exception(AsyncReadGraph graph, Throwable t) {
3158                 exception.set(t);
3159             }
3160
3161
3162         });
3163
3164         Throwable t = exception.get();
3165         if(t != null) {
3166             if(t instanceof DatabaseException) throw (DatabaseException)t;
3167             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3168         }
3169
3170         return result;
3171
3172     }
3173
3174     @Override
3175     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3176         assertNotSession();
3177         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3178     }
3179
3180     @Override
3181     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3182         assertNotSession();
3183         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3184     }
3185
3186     @Override
3187     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3188         assertNotSession();
3189        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3190     }
3191
3192     @Override
3193     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3194         assertNotSession();
3195         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3196     }
3197
3198     @Override
3199     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3200         assertNotSession();
3201         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3202     }
3203
3204
3205     @Override
3206     public <T> void asyncRequest(Read<T> request) {
3207
3208         asyncRequest(request, new ProcedureAdapter<T>() {
3209             @Override
3210             public void exception(Throwable t) {
3211                 t.printStackTrace();
3212             }
3213         });
3214
3215     }
3216
3217     @Override
3218     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3219         asyncRequest(request, (AsyncProcedure<T>)procedure);
3220     }
3221
3222     @Override
3223     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3224         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3225     }
3226
3227     @Override
3228     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3229         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3230     }
3231
3232     @Override
3233     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3234         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3235     }
3236
3237     @Override
3238     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3239         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3240     }
3241
3242     @Override
3243     final public <T> void asyncRequest(final AsyncRead<T> request) {
3244
3245         assert(request != null);
3246
3247         asyncRequest(request, new ProcedureAdapter<T>() {
3248             @Override
3249             public void exception(Throwable t) {
3250                 t.printStackTrace();
3251             }
3252         });
3253
3254     }
3255
3256     @Override
3257     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3258         scheduleRequest(request, procedure, procedure, null);
3259     }
3260
3261     @Override
3262     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3263         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3264     }
3265
3266     @Override
3267     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3268         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3269     }
3270
3271     @Override
3272     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3273         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3274     }
3275
3276     @Override
3277     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3278         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3279     }
3280
3281     @Override
3282     public <T> void asyncRequest(MultiRead<T> request) {
3283
3284         assert(request != null);
3285
3286         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3287             @Override
3288             public void exception(AsyncReadGraph graph, Throwable t) {
3289                 t.printStackTrace();
3290             }
3291         });
3292
3293     }
3294
3295     @Override
3296     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3297         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3298     }
3299
3300     @Override
3301     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3302         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3303     }
3304
3305     @Override
3306     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3307         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3308     }
3309
3310     @Override
3311     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3312         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3313     }
3314
3315     @Override
3316     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3317         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3318     }
3319
3320     @Override
3321     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3322
3323         assert(request != null);
3324
3325         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3326             @Override
3327             public void exception(AsyncReadGraph graph, Throwable t) {
3328                 t.printStackTrace();
3329             }
3330         });
3331
3332     }
3333
3334     @Override
3335     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3336         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3337     }
3338
3339     @Override
3340     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3341         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3342     }
3343
3344     @Override
3345     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3346         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3347     }
3348
3349     @Override
3350     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3351         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3352     }
3353
3354     @Override
3355     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3356         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3357     }
3358
3359     @Override
3360     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3361         assertNotSession();
3362         throw new Error("Not implemented!");
3363     }
3364
3365     @Override
3366     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3367         throw new Error("Not implemented!");
3368     }
3369
3370     @Override
3371     final public <T> void asyncRequest(final ExternalRead<T> request) {
3372
3373         assert(request != null);
3374
3375         asyncRequest(request, new ProcedureAdapter<T>() {
3376             @Override
3377             public void exception(Throwable t) {
3378                 t.printStackTrace();
3379             }
3380         });
3381
3382     }
3383
3384     @Override
3385     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3386         asyncRequest(request, (Procedure<T>)procedure);
3387     }
3388
3389
3390
3391     void check(Throwable t) throws DatabaseException {
3392         if(t != null) {
3393             if(t instanceof DatabaseException) throw (DatabaseException)t;
3394             else throw new DatabaseException("Unexpected exception", t);
3395         }
3396     }
3397
3398     void check(DataContainer<Throwable> container) throws DatabaseException {
3399         Throwable t = container.get();
3400         if(t != null) {
3401             if(t instanceof DatabaseException) throw (DatabaseException)t;
3402             else throw new DatabaseException("Unexpected exception", t);
3403         }
3404     }
3405
3406
3407
3408
3409
3410
3411     boolean sameProvider(Write request) {
3412         if(writeState.getGraph().provider != null) {
3413             return writeState.getGraph().provider.equals(request.getProvider());
3414         } else {
3415             return request.getProvider() == null;
3416         }
3417     }
3418
3419     boolean plainWrite(WriteGraphImpl graph) {
3420         if(graph == null) return false;
3421         if(graph.writeSupport.writeOnly()) return false;
3422         return true;
3423     }
3424
3425
3426     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3427     private void assertNotSession() throws DatabaseException {
3428         Thread current = Thread.currentThread();
3429         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3430     }
3431
3432     void assertAlive() {
3433         if (!state.isAlive())
3434             throw new RuntimeDatabaseException("Session has been shut down.");
3435     }
3436
3437     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3438         return querySupport.getValueStream(graph, querySupport.getId(resource));
3439     }
3440
3441     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3442         return querySupport.getValue(graph, querySupport.getId(resource));
3443     }
3444
3445     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3446         acquire(semaphore, request, null);
3447     }
3448
3449     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3450         assertAlive();
3451         try {
3452             long tryCount = 0;
3453             // Loop until the semaphore is acquired reporting requests that take a long time.
3454             while (true) {
3455
3456                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3457                     // Acquired OK.
3458                     return;
3459                 } else {
3460                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3461                         ++tryCount;
3462                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3463                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3464                                 * tryCount;
3465                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3466                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3467                     }
3468                 }
3469
3470                 assertAlive();
3471
3472             }
3473         } catch (InterruptedException e) {
3474             e.printStackTrace();
3475             // FIXME: Should perhaps do something else in this case ??
3476         }
3477     }
3478
3479
3480
3481 //    @Override
3482 //    public boolean holdOnToTransactionAfterCancel() {
3483 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3484 //    }
3485 //
3486 //    @Override
3487 //    public boolean holdOnToTransactionAfterCommit() {
3488 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3489 //    }
3490 //
3491 //    @Override
3492 //    public boolean holdOnToTransactionAfterRead() {
3493 //        return transactionPolicy.holdOnToTransactionAfterRead();
3494 //    }
3495 //
3496 //    @Override
3497 //    public void onRelinquish() {
3498 //        transactionPolicy.onRelinquish();
3499 //    }
3500 //
3501 //    @Override
3502 //    public void onRelinquishDone() {
3503 //        transactionPolicy.onRelinquishDone();
3504 //    }
3505 //
3506 //    @Override
3507 //    public void onRelinquishError() {
3508 //        transactionPolicy.onRelinquishError();
3509 //    }
3510
3511
3512     @Override
3513     public Session getSession() {
3514         return this;
3515     }
3516
3517
3518     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3519     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3520
3521     public void ceased(int thread) {
3522
3523         requestManager.ceased(thread);
3524
3525     }
3526
3527     public int getAmountOfQueryThreads() {
3528         // This must be a power of two
3529         return 16;
3530 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3531     }
3532
3533     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3534
3535         return new ResourceImpl(resourceSupport, key);
3536
3537     }
3538
3539     public void acquireWriteOnly() {
3540         writeOnly = true;
3541     }
3542
3543     public void releaseWriteOnly(ReadGraphImpl graph) {
3544         writeOnly = false;
3545         queryProvider2.releaseWrite(graph);
3546     }
3547
3548     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3549
3550         long start = System.nanoTime();
3551
3552         while(dirtyPrimitives) {
3553             dirtyPrimitives = false;
3554             getQueryProvider2().performDirtyUpdates(writer);
3555             getQueryProvider2().performScheduledUpdates(writer);
3556         }
3557
3558         fireMetadataListeners(writer, clientChanges);
3559
3560         if(DebugPolicy.PERFORMANCE_DATA) {
3561             long end = System.nanoTime();
3562             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3563         }
3564
3565     }
3566
3567     void removeTemporaryData() {
3568         File platform = Platform.getLocation().toFile();
3569         File tempFiles = new File(platform, "tempFiles");
3570         File temp = new File(tempFiles, "db");
3571         if(!temp.exists()) 
3572             return;
3573         try {
3574             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3575                 @Override
3576                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3577                     try {
3578                         Files.delete(file);
3579                     } catch (IOException e) {
3580                         Logger.defaultLogError(e);
3581                     }
3582                     return FileVisitResult.CONTINUE;
3583                 }
3584             });
3585         } catch (IOException e) {
3586             Logger.defaultLogError(e);
3587         }
3588     }
3589
3590     public void handleCreatedClusters() {
3591         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3592             createdClusters.forEach(new TLongProcedure() {
3593
3594                 @Override
3595                 public boolean execute(long value) {
3596                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3597                     cluster.setImmutable(true, clusterTranslator);
3598                     return true;
3599                 }
3600             });
3601         }
3602         createdClusters.clear();
3603     }
3604
3605     @Override
3606     public Object getModificationCounter() {
3607         return queryProvider2.modificationCounter;
3608     }
3609     
3610     @Override
3611     public void markUndoPoint() {
3612         state.setCombine(false);
3613     }
3614
3615 }