]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
b7adbe49bd40c5466fe7a039eb7511ffa71c6b93
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryProcessor;
93 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
94 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
95 import org.simantics.db.impl.service.QueryDebug;
96 import org.simantics.db.impl.support.VirtualGraphServerSupport;
97 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
98 import org.simantics.db.procedure.AsyncListener;
99 import org.simantics.db.procedure.AsyncMultiListener;
100 import org.simantics.db.procedure.AsyncMultiProcedure;
101 import org.simantics.db.procedure.AsyncProcedure;
102 import org.simantics.db.procedure.Listener;
103 import org.simantics.db.procedure.ListenerBase;
104 import org.simantics.db.procedure.MultiListener;
105 import org.simantics.db.procedure.MultiProcedure;
106 import org.simantics.db.procedure.Procedure;
107 import org.simantics.db.procedure.SyncListener;
108 import org.simantics.db.procedure.SyncMultiListener;
109 import org.simantics.db.procedure.SyncMultiProcedure;
110 import org.simantics.db.procedure.SyncProcedure;
111 import org.simantics.db.procore.cluster.ClusterImpl;
112 import org.simantics.db.procore.cluster.ClusterTraits;
113 import org.simantics.db.procore.protocol.Constants;
114 import org.simantics.db.procore.protocol.DebugPolicy;
115 import org.simantics.db.request.AsyncMultiRead;
116 import org.simantics.db.request.AsyncRead;
117 import org.simantics.db.request.DelayedWrite;
118 import org.simantics.db.request.DelayedWriteResult;
119 import org.simantics.db.request.ExternalRead;
120 import org.simantics.db.request.MultiRead;
121 import org.simantics.db.request.Read;
122 import org.simantics.db.request.ReadInterface;
123 import org.simantics.db.request.Write;
124 import org.simantics.db.request.WriteInterface;
125 import org.simantics.db.request.WriteOnly;
126 import org.simantics.db.request.WriteOnlyResult;
127 import org.simantics.db.request.WriteResult;
128 import org.simantics.db.request.WriteTraits;
129 import org.simantics.db.service.ByteReader;
130 import org.simantics.db.service.ClusterBuilder;
131 import org.simantics.db.service.ClusterBuilderFactory;
132 import org.simantics.db.service.ClusterControl;
133 import org.simantics.db.service.ClusterSetsSupport;
134 import org.simantics.db.service.ClusterUID;
135 import org.simantics.db.service.ClusteringSupport;
136 import org.simantics.db.service.CollectionSupport;
137 import org.simantics.db.service.DebugSupport;
138 import org.simantics.db.service.DirectQuerySupport;
139 import org.simantics.db.service.GraphChangeListenerSupport;
140 import org.simantics.db.service.InitSupport;
141 import org.simantics.db.service.LifecycleSupport;
142 import org.simantics.db.service.ManagementSupport;
143 import org.simantics.db.service.QueryControl;
144 import org.simantics.db.service.SerialisationSupport;
145 import org.simantics.db.service.ServerInformation;
146 import org.simantics.db.service.ServiceActivityMonitor;
147 import org.simantics.db.service.SessionEventSupport;
148 import org.simantics.db.service.SessionMonitorSupport;
149 import org.simantics.db.service.SessionUserSupport;
150 import org.simantics.db.service.StatementSupport;
151 import org.simantics.db.service.TransactionPolicySupport;
152 import org.simantics.db.service.TransactionSupport;
153 import org.simantics.db.service.TransferableGraphSupport;
154 import org.simantics.db.service.UndoRedoSupport;
155 import org.simantics.db.service.VirtualGraphSupport;
156 import org.simantics.db.service.XSupport;
157 import org.simantics.layer0.Layer0;
158 import org.simantics.utils.DataContainer;
159 import org.simantics.utils.Development;
160 import org.simantics.utils.threads.logger.ITask;
161 import org.simantics.utils.threads.logger.ThreadLogger;
162
163 import gnu.trove.procedure.TLongProcedure;
164 import gnu.trove.set.hash.TLongHashSet;
165
166
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168
169     protected static final boolean DEBUG        = false;
170
171     private static final boolean DIAGNOSTICS       = false;
172
173     private TransactionPolicySupport transactionPolicy;
174
175     final private ClusterControl clusterControl;
176
177     final protected BuiltinSupportImpl builtinSupport;
178     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
179     final protected ClusterSetsSupport clusterSetsSupport;
180     final protected LifecycleSupportImpl lifecycleSupport;
181
182     protected QuerySupportImpl querySupport;
183     protected ResourceSupportImpl resourceSupport;
184     protected WriteSupport writeSupport;
185     public ClusterTranslator clusterTranslator;
186
187     boolean dirtyPrimitives = false;
188
189     public static final int SERVICE_MODE_CREATE = 2;
190     public static final int SERVICE_MODE_ALLOW = 1;
191     public int serviceMode = 0;
192     public boolean createdImmutableClusters = false;
193     public TLongHashSet createdClusters = new TLongHashSet();
194
195     private Layer0 L0;
196
197     /**
198      * The service locator maintained by the workbench. These services are
199      * initialized during workbench during the <code>init</code> method.
200      */
201     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
202
203     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
204
205     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
206
207     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
208
209     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
210
211     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
212
213     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
214
215     final protected State                                        state              = new State();
216
217     protected GraphSession                                       graphSession       = null;
218
219     protected SessionManager                                     sessionManagerImpl = null;
220
221     protected UserAuthenticationAgent                            authAgent          = null;
222
223     protected UserAuthenticator                                  authenticator      = null;
224
225     protected Resource                                           user               = null;
226
227     protected ClusterStream                                      clusterStream      = null;
228
229     protected SessionRequestManager                         requestManager = null;
230
231     public ClusterTable                                       clusterTable       = null;
232
233     public QueryProcessor                                     queryProvider2 = null;
234
235     ClientChangesImpl                                  clientChanges      = null;
236
237     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
238
239 //    protected long                                               newClusterId       = Constants.NullClusterId;
240
241     protected int                                                flushCounter       = 0;
242
243     protected boolean                                            writeOnly          = false;
244
245     WriteState<?>                                                writeState = null;
246     WriteStateBase<?>                                            delayedWriteState = null;
247     protected Resource defaultClusterSet = null; // If not null then used for newResource().
248
249     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250
251 //        if (authAgent == null)
252 //            throw new IllegalArgumentException("null authentication agent");
253
254         File t = StaticSessionProperties.virtualGraphStoragePath;
255         if (null == t)
256             t = new File(".");
257         this.clusterTable = new ClusterTable(this, t);
258         this.builtinSupport = new BuiltinSupportImpl(this);
259         this.sessionManagerImpl = sessionManagerImpl;
260         this.user = null;
261 //        this.authAgent = authAgent;
262
263         serviceLocator.registerService(Session.class, this);
264         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
265         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
266         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
267         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
268         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
269         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
270         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
271         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
272         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
273         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
274         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
275         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
276         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
277         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
278         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
279         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
280         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
281         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
282         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
283         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
284         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
285         ServiceActivityUpdaterForWriteTransactions.register(this);
286
287         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
288         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
289         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
290         this.lifecycleSupport = new LifecycleSupportImpl(this);
291         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
292         this.transactionPolicy = new TransactionPolicyRelease();
293         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
294         this.clusterControl = new ClusterControlImpl(this);
295         serviceLocator.registerService(ClusterControl.class, clusterControl);
296         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
297         this.clusterSetsSupport.setReadDirectory(t.toPath());
298         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
299         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
300     }
301
302     /*
303      *
304      * Session interface
305      */
306
307
308     @Override
309     public Resource getRootLibrary() {
310         return queryProvider2.getRootLibraryResource();
311     }
312
313     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
314         if (!graphSession.dbSession.refreshEnabled())
315             return;
316         try {
317             getClusterTable().refresh(csid, this, clusterUID);
318         } catch (Throwable t) {
319             Logger.defaultLogError("Refesh failed.", t);
320         }
321     }
322
323     static final class TaskHelper {
324         private final String name;
325         private Object result;
326         final Semaphore sema = new Semaphore(0);
327         private Throwable throwable = null;
328         final Consumer<DatabaseException> callback = e -> {
329             synchronized (TaskHelper.this) {
330                 throwable = e;
331             }
332         };
333         final Procedure<Object> proc = new Procedure<Object>() {
334             @Override
335             public void execute(Object result) {
336                 callback.accept(null);
337             }
338             @Override
339             public void exception(Throwable t) {
340                 if (t instanceof DatabaseException)
341                     callback.accept((DatabaseException)t);
342                 else
343                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
344             }
345         };
346         final WriteTraits writeTraits = new WriteTraits() {};
347         TaskHelper(String name) {
348             this.name = name;
349         }
350         @SuppressWarnings("unchecked")
351         <T> T getResult() {
352             return (T)result;
353         }
354         void setResult(Object result) {
355             this.result = result;
356         }
357 //        Throwable throwableGet() {
358 //            return throwable;
359 //        }
360         synchronized void throwableSet(Throwable t) {
361             throwable = t;
362         }
363 //        void throwableSet(String t) {
364 //            throwable = new InternalException("" + name + " operation failed. " + t);
365 //        }
366         synchronized void throwableCheck()
367         throws DatabaseException {
368             if (null != throwable)
369                 if (throwable instanceof DatabaseException)
370                     throw (DatabaseException)throwable;
371                 else
372                     throw new DatabaseException("Undo operation failed.", throwable);
373         }
374         void throw_(String message)
375         throws DatabaseException {
376             throw new DatabaseException("" + name + " operation failed. " + message);
377         }
378     }
379
380
381
382     private ListenerBase getListenerBase(Object procedure) {
383         if (procedure instanceof ListenerBase)
384             return (ListenerBase) procedure;
385         else
386             return null;
387     }
388
389     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
390         scheduleRequest(request, callback, notify, null);
391     }
392
393     /* (non-Javadoc)
394      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
395      */
396     @Override
397     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
398
399         assert (request != null);
400
401         if(Development.DEVELOPMENT) {
402             try {
403             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
404                 System.err.println("schedule write '" + request + "'");
405             } catch (Throwable t) {
406                 Logger.defaultLogError(t);
407             }
408         }
409
410         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
411
412         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
413
414             @Override
415             public void run(int thread) {
416
417                 if(Development.DEVELOPMENT) {
418                     try {
419                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
420                         System.err.println("perform write '" + request + "'");
421                     } catch (Throwable t) {
422                         Logger.defaultLogError(t);
423                     }
424                 }
425
426                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
427
428                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
429
430                 try {
431
432                     flushCounter = 0;
433                     Disposable.safeDispose(clientChanges);
434                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
435
436                     VirtualGraph vg = getProvider(request.getProvider());
437
438                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
440
441                         @Override
442                         public void execute(Object result) {
443                             if(callback != null) callback.accept(null);
444                         }
445
446                         @Override
447                         public void exception(Throwable t) {
448                             if(callback != null) callback.accept((DatabaseException)t);
449                         }
450
451                     });
452
453                     assert (null != writer);
454 //                    writer.state.barrier.inc();
455
456                     try {
457                         request.perform(writer);
458                         assert (null != writer);
459                     } catch (Throwable t) {
460                         if (!(t instanceof CancelTransactionException))
461                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462                         writeState.except(t);
463                     } finally {
464 //                        writer.state.barrier.dec();
465 //                        writer.waitAsync(request);
466                     }
467
468
469                     assert(!queryProvider2.cache.dirty);
470
471                 } catch (Throwable e) {
472
473                     // Log it first, just to be safe that the error is always logged.
474                     if (!(e instanceof CancelTransactionException))
475                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
476
477 //                    writeState.getGraph().state.barrier.dec();
478 //                    writeState.getGraph().waitAsync(request);
479
480                     writeState.except(e);
481
482 //                    try {
483 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 //                        // All we can do here is to log those, can't really pass them anywhere.
485 //                        if (callback != null) {
486 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 //                            else callback.run(new DatabaseException(e));
488 //                        }
489 //                    } catch (Throwable e2) {
490 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
491 //                    }
492 //                    boolean empty = clusterStream.reallyFlush();
493 //                    int callerThread = -1;
494 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
496 //                    try {
497 //                        // Send all local changes to server so it can calculate correct reverse change set.
498 //                        // This will call clusterStream.accept().
499 //                        state.cancelCommit(context, clusterStream);
500 //                        if (!empty) {
501 //                            if (!context.isOk()) // this is a blocking operation
502 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
504 //                        }
505 //                        state.cancelCommit2(context, clusterStream);
506 //                    } catch (DatabaseException e3) {
507 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
508 //                    } finally {
509 //                        clusterStream.setOff(false);
510 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
511 //                    }
512
513                 } finally {
514
515                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
516
517                 }
518
519                 task.finish();
520
521                 if(Development.DEVELOPMENT) {
522                     try {
523                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524                         System.err.println("finish write '" + request + "'");
525                     } catch (Throwable t) {
526                         Logger.defaultLogError(t);
527                     }
528                 }
529
530             }
531
532         }, combine);
533
534     }
535
536     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537         scheduleRequest(request, procedure, notify, null);
538     }
539
540     /* (non-Javadoc)
541      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
542      */
543     @Override
544     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
545
546         assert (request != null);
547
548         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
549
550         requestManager.scheduleWrite(new SessionTask(request, thread) {
551
552             @Override
553             public void run(int thread) {
554
555                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
556
557                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
558
559                 flushCounter = 0;
560                 Disposable.safeDispose(clientChanges);
561                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
562
563                 VirtualGraph vg = getProvider(request.getProvider());
564                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
565
566                 try {
567                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
568                     writeState = writeStateT;
569
570                     assert (null != writer);
571 //                    writer.state.barrier.inc();
572                     writeStateT.setResult(request.perform(writer));
573                     assert (null != writer);
574
575 //                    writer.state.barrier.dec();
576 //                    writer.waitAsync(null);
577
578                 } catch (Throwable e) {
579
580 //                    writer.state.barrier.dec();
581 //                    writer.waitAsync(null);
582
583                     writeState.except(e);
584
585 //                  state.stopWriteTransaction(clusterStream);
586 //
587 //              } catch (Throwable e) {
588 //                  // Log it first, just to be safe that the error is always logged.
589 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
590 //
591 //                  try {
592 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
593 //                      // All we can do here is to log those, can't really pass them anywhere.
594 //                      if (procedure != null) {
595 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
596 //                          else procedure.exception(new DatabaseException(e));
597 //                      }
598 //                  } catch (Throwable e2) {
599 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
600 //                  }
601 //
602 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
603 //
604 //                  state.stopWriteTransaction(clusterStream);
605
606                 } finally {
607                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
608                 }
609
610 //              if(notify != null) notify.release();
611
612                 task.finish();
613
614             }
615
616         }, combine);
617
618     }
619
620     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
621         scheduleRequest(request, callback, notify, null);
622     }
623
624     /* (non-Javadoc)
625      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
626      */
627     @Override
628     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
629
630         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
631
632         assert (request != null);
633
634         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
635
636         requestManager.scheduleWrite(new SessionTask(request, thread) {
637
638             @Override
639             public void run(int thread) {
640                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
641
642                 Procedure<Object> stateProcedure = new Procedure<Object>() {
643                     @Override
644                     public void execute(Object result) {
645                         if (callback != null)
646                             callback.accept(null);
647                     }
648                     @Override
649                     public void exception(Throwable t) {
650                         if (callback != null) {
651                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
652                             else callback.accept(new DatabaseException(t));
653                         } else
654                             Logger.defaultLogError("Unhandled exception", t);
655                     }
656                 };
657
658                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
659                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
660                 DelayedWriteGraph dwg = null;
661 //                newGraph.state.barrier.inc();
662
663                 try {
664                     dwg = new DelayedWriteGraph(newGraph);
665                     request.perform(dwg);
666                 } catch (Throwable e) {
667                     delayedWriteState.except(e);
668                     total.finish();
669                     dwg.close();
670                     return;
671                 } finally {
672 //                    newGraph.state.barrier.dec();
673 //                    newGraph.waitAsync(request);
674                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
675                 }
676
677                 delayedWriteState = null;
678
679                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
680                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
681
682                 flushCounter = 0;
683                 Disposable.safeDispose(clientChanges);
684                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
685
686                 acquireWriteOnly();
687
688                 VirtualGraph vg = getProvider(request.getProvider());
689                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
690
691                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
692
693                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
694
695                 assert (null != writer);
696 //                writer.state.barrier.inc();
697
698                 try {
699                     // Cannot cancel
700                     dwg.commit(writer, request);
701
702                         if(defaultClusterSet != null) {
703                                 XSupport xs = getService(XSupport.class);
704                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
705                                 ClusteringSupport cs = getService(ClusteringSupport.class);
706                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
707                         }
708
709                     // This makes clusters available from server
710                     clusterStream.reallyFlush();
711
712                     releaseWriteOnly(writer);
713                     handleUpdatesAndMetadata(writer);
714
715                 } catch (ServiceException e) {
716 //                    writer.state.barrier.dec();
717 //                    writer.waitAsync(null);
718
719                     // These shall be requested from server
720                     clusterTable.removeWriteOnlyClusters();
721                     // This makes clusters available from server
722                     clusterStream.reallyFlush();
723
724                     releaseWriteOnly(writer);
725                     writeState.except(e);
726                 } finally {
727                     // Debugging & Profiling
728                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
729                     task2.finish();
730                     total.finish();
731                 }
732             }
733
734         }, combine);
735
736     }
737
738     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
739         scheduleRequest(request, procedure, notify, null);
740     }
741
742     /* (non-Javadoc)
743      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
744      */
745     @Override
746     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
747         throw new Error("Not implemented");
748     }
749
750     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
751         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
752         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
753             createdClusters.add(cluster.clusterId);
754         }
755         return cluster;
756     }
757
758     class WriteOnlySupport implements WriteSupport {
759
760         ClusterStream stream;
761         ClusterImpl currentCluster;
762
763         public WriteOnlySupport() {
764             this.stream = clusterStream;
765         }
766
767         @Override
768         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
769             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
770         }
771
772         @Override
773         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
774
775             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
776
777             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
778                 if(s != queryProvider2.getRootLibrary())
779                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
780
781             try {
782                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
783             } catch (DatabaseException e) {
784                 Logger.defaultLogError(e);
785                 //throw e;
786             }
787
788             clientChanges.invalidate(s);
789
790             if (cluster.isWriteOnly())
791                 return;
792             queryProvider2.updateStatements(s, p);
793
794         }
795
796         @Override
797         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
798             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
799         }
800
801         @Override
802         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
803
804             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
805             try {
806                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
807             } catch (DatabaseException e) {
808                 Logger.defaultLogError(e);
809             }
810
811             clientChanges.invalidate(rid);
812
813             if (cluster.isWriteOnly())
814                 return;
815             queryProvider2.updateValue(rid);
816
817         }
818
819         @Override
820         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
821
822             if(amount < 65536) {
823                 claimValue(provider,resource, reader.readBytes(null, amount));
824                 return;
825             }
826
827             byte[] bytes = new byte[65536];
828
829             int rid = ((ResourceImpl)resource).id;
830             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
831             try {
832                 int left = amount;
833                 while(left > 0) {
834                     int block = Math.min(left, 65536);
835                     reader.readBytes(bytes, block);
836                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
837                     left -= block;
838                 }
839             } catch (DatabaseException e) {
840                 Logger.defaultLogError(e);
841             }
842
843             clientChanges.invalidate(rid);
844
845             if (cluster.isWriteOnly())
846                 return;
847             queryProvider2.updateValue(rid);
848
849         }
850
851         private void maintainCluster(ClusterImpl before, ClusterI after_) {
852             if(after_ != null && after_ != before) {
853                 ClusterImpl after = (ClusterImpl)after_;
854                 if(currentCluster == before) {
855                     currentCluster = after;
856                 }
857                 clusterTable.replaceCluster(after);
858             }
859         }
860
861         public int createResourceKey(int foreignCounter) throws DatabaseException {
862             if(currentCluster == null) {
863                 currentCluster = getNewResourceCluster();
864             }
865             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
866                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
867                 newCluster.foreignLookup = new byte[foreignCounter];
868                 currentCluster = newCluster;
869                 if (DEBUG)
870                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
871             }
872             return currentCluster.createResource(clusterTranslator);
873         }
874
875         @Override
876         public Resource createResource(VirtualGraph provider) throws DatabaseException {
877             if(currentCluster == null) {
878                 if (null != defaultClusterSet) {
879                         ResourceImpl result = getNewResource(defaultClusterSet);
880                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
881                     return result;
882                 } else {
883                     currentCluster = getNewResourceCluster();
884                 }
885             }
886             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
887                 if (null != defaultClusterSet) {
888                         ResourceImpl result = getNewResource(defaultClusterSet);
889                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
890                     return result;
891                 } else {
892                     currentCluster = getNewResourceCluster();
893                 }
894             }
895             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
896         }
897
898         @Override
899         public Resource createResource(VirtualGraph provider, long clusterId)
900         throws DatabaseException {
901             return getNewResource(clusterId);
902         }
903
904         @Override
905         public Resource createResource(VirtualGraph provider, Resource clusterSet)
906         throws DatabaseException {
907             return getNewResource(clusterSet);
908         }
909
910         @Override
911         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
912                 throws DatabaseException {
913             getNewClusterSet(clusterSet);
914         }
915
916         @Override
917         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
918         throws ServiceException {
919             return containsClusterSet(clusterSet);
920         }
921
922         public void selectCluster(long cluster) {
923                 currentCluster = clusterTable.getClusterByClusterId(cluster);
924                 long setResourceId = clusterSetsSupport.getSet(cluster);
925                 clusterSetsSupport.put(setResourceId, cluster);
926         }
927
928         @Override
929         public Resource setDefaultClusterSet(Resource clusterSet)
930         throws ServiceException {
931                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
932                 if(clusterSet != null) {
933                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
934                         currentCluster = clusterTable.getClusterByClusterId(id);
935                         return result;
936                 } else {
937                         currentCluster = null;
938                         return null;
939                 }
940         }
941
942         @Override
943         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
944
945                  provider = getProvider(provider);
946              if (null == provider) {
947                  int key = ((ResourceImpl)resource).id;
948
949
950
951                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
952 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
953 //                      if(key != queryProvider2.getRootLibrary())
954 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
955
956 //                 try {
957 //                     cluster.removeValue(key, clusterTranslator);
958 //                 } catch (DatabaseException e) {
959 //                     Logger.defaultLogError(e);
960 //                     return;
961 //                 }
962
963                  clusterTable.writeOnlyInvalidate(cluster);
964
965                  try {
966                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
967                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
968                          clusterTranslator.removeValue(cluster);
969                  } catch (DatabaseException e) {
970                          Logger.defaultLogError(e);
971                  }
972
973                  queryProvider2.invalidateResource(key);
974                  clientChanges.invalidate(key);
975
976              } else {
977                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
978                  queryProvider2.updateValue(querySupport.getId(resource));
979                  clientChanges.claimValue(resource);
980              }
981
982
983         }
984
985         @Override
986         public void flush(boolean intermediate) {
987             throw new UnsupportedOperationException();
988         }
989
990         @Override
991         public void flushCluster() {
992             clusterTable.flushCluster(graphSession);
993             if(defaultClusterSet != null) {
994                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
995             }
996             currentCluster = null;
997         }
998
999         @Override
1000         public void flushCluster(Resource r) {
1001             throw new UnsupportedOperationException("flushCluster resource " + r);
1002         }
1003
1004         @Override
1005         public void gc() {
1006         }
1007
1008         @Override
1009         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1010                 Resource object) {
1011
1012                 int s = ((ResourceImpl)subject).id;
1013                 int p = ((ResourceImpl)predicate).id;
1014                 int o = ((ResourceImpl)object).id;
1015
1016                 provider = getProvider(provider);
1017                 if (null == provider) {
1018
1019                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1020                         clusterTable.writeOnlyInvalidate(cluster);
1021
1022                         try {
1023
1024                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1025                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1026                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1027
1028                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1029                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1030
1031                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1032                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1033                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1034                                 clusterTranslator.removeStatement(cluster);
1035
1036                                 queryProvider2.invalidateResource(s);
1037                                 clientChanges.invalidate(s);
1038
1039                         } catch (DatabaseException e) {
1040
1041                                 Logger.defaultLogError(e);
1042
1043                         }
1044
1045                         return true;
1046
1047                 } else {
1048                         
1049                         ((VirtualGraphImpl)provider).deny(s, p, o);
1050                         queryProvider2.invalidateResource(s);
1051                         clientChanges.invalidate(s);
1052
1053                         return true;
1054
1055                 }
1056
1057         }
1058
1059         @Override
1060         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1061             throw new UnsupportedOperationException();
1062         }
1063
1064         @Override
1065         public boolean writeOnly() {
1066             return true;
1067         }
1068
1069         @Override
1070         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1071             writeSupport.performWriteRequest(graph, request);
1072         }
1073
1074         @Override
1075         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1076             throw new UnsupportedOperationException();
1077         }
1078
1079         @Override
1080         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1081             throw new UnsupportedOperationException();
1082         }
1083
1084         @Override
1085         public <T> void addMetadata(Metadata data) throws ServiceException {
1086             writeSupport.addMetadata(data);
1087         }
1088
1089         @Override
1090         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1091             return writeSupport.getMetadata(clazz);
1092         }
1093
1094         @Override
1095         public TreeMap<String, byte[]> getMetadata() {
1096             return writeSupport.getMetadata();
1097         }
1098
1099         @Override
1100         public void commitDone(WriteTraits writeTraits, long csid) {
1101             writeSupport.commitDone(writeTraits, csid);
1102         }
1103
1104         @Override
1105         public void clearUndoList(WriteTraits writeTraits) {
1106             writeSupport.clearUndoList(writeTraits);
1107         }
1108         @Override
1109         public int clearMetadata() {
1110             return writeSupport.clearMetadata();
1111         }
1112
1113                 @Override
1114                 public void startUndo() {
1115                         writeSupport.startUndo();
1116                 }
1117     }
1118
1119     class VirtualWriteOnlySupport implements WriteSupport {
1120
1121 //        @Override
1122 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1123 //                Resource object) {
1124 //            throw new UnsupportedOperationException();
1125 //        }
1126
1127         @Override
1128         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1129
1130             TransientGraph impl = (TransientGraph)provider;
1131             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1132             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1133             clientChanges.claim(subject, predicate, object);
1134
1135         }
1136
1137         @Override
1138         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1139
1140             TransientGraph impl = (TransientGraph)provider;
1141             impl.claim(subject, predicate, object);
1142             getQueryProvider2().updateStatements(subject, predicate);
1143             clientChanges.claim(subject, predicate, object);
1144
1145         }
1146
1147         @Override
1148         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1149             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1150         }
1151
1152         @Override
1153         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1154             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1155             getQueryProvider2().updateValue(resource);
1156             clientChanges.claimValue(resource);
1157         }
1158
1159         @Override
1160         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1161             byte[] value = reader.readBytes(null, amount);
1162             claimValue(provider, resource, value);
1163         }
1164
1165         @Override
1166         public Resource createResource(VirtualGraph provider) {
1167             TransientGraph impl = (TransientGraph)provider;
1168             return impl.getResource(impl.newResource(false));
1169         }
1170
1171         @Override
1172         public Resource createResource(VirtualGraph provider, long clusterId) {
1173             throw new UnsupportedOperationException();
1174         }
1175
1176         @Override
1177         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1178         throws DatabaseException {
1179             throw new UnsupportedOperationException();
1180         }
1181
1182         @Override
1183         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1184         throws DatabaseException {
1185             throw new UnsupportedOperationException();
1186         }
1187
1188         @Override
1189         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1190         throws ServiceException {
1191             throw new UnsupportedOperationException();
1192         }
1193
1194         @Override
1195         public Resource setDefaultClusterSet(Resource clusterSet)
1196         throws ServiceException {
1197                 return null;
1198         }
1199
1200         @Override
1201         public void denyValue(VirtualGraph provider, Resource resource) {
1202             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1203             getQueryProvider2().updateValue(querySupport.getId(resource));
1204             // NOTE: this only keeps track of value changes by-resource.
1205             clientChanges.claimValue(resource);
1206         }
1207
1208         @Override
1209         public void flush(boolean intermediate) {
1210             throw new UnsupportedOperationException();
1211         }
1212
1213         @Override
1214         public void flushCluster() {
1215             throw new UnsupportedOperationException();
1216         }
1217
1218         @Override
1219         public void flushCluster(Resource r) {
1220             throw new UnsupportedOperationException("Resource " + r);
1221         }
1222
1223         @Override
1224         public void gc() {
1225         }
1226
1227         @Override
1228         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1229             TransientGraph impl = (TransientGraph) provider;
1230             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1231             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1232             clientChanges.deny(subject, predicate, object);
1233             return true;
1234         }
1235
1236         @Override
1237         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1238             throw new UnsupportedOperationException();
1239         }
1240
1241         @Override
1242         public boolean writeOnly() {
1243             return true;
1244         }
1245
1246         @Override
1247         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1248             throw new UnsupportedOperationException();
1249         }
1250
1251         @Override
1252         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public <T> void addMetadata(Metadata data) throws ServiceException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public TreeMap<String, byte[]> getMetadata() {
1273             throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public void commitDone(WriteTraits writeTraits, long csid) {
1278         }
1279
1280         @Override
1281         public void clearUndoList(WriteTraits writeTraits) {
1282         }
1283
1284         @Override
1285         public int clearMetadata() {
1286             return 0;
1287         }
1288
1289                 @Override
1290                 public void startUndo() {
1291                 }
1292     }
1293
1294     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1295
1296         try {
1297
1298             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1299
1300             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1301
1302             flushCounter = 0;
1303             Disposable.safeDispose(clientChanges);
1304             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1305
1306             acquireWriteOnly();
1307
1308             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1309
1310             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1311
1312             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1313             writeState = writeStateT;
1314
1315             assert (null != writer);
1316 //            writer.state.barrier.inc();
1317             long start = System.nanoTime();
1318             T result = request.perform(writer);
1319             long duration = System.nanoTime() - start;
1320             if (DEBUG) {
1321                 System.err.println("################");
1322                 System.err.println("WriteOnly duration " + 1e-9*duration);
1323             }
1324             writeStateT.setResult(result);
1325
1326             // This makes clusters available from server
1327             clusterStream.reallyFlush();
1328
1329             // This will trigger query updates
1330             releaseWriteOnly(writer);
1331             assert (null != writer);
1332
1333             handleUpdatesAndMetadata(writer);
1334
1335         } catch (CancelTransactionException e) {
1336
1337             releaseWriteOnly(writeState.getGraph());
1338
1339             clusterTable.removeWriteOnlyClusters();
1340             state.stopWriteTransaction(clusterStream);
1341
1342         } catch (Throwable e) {
1343
1344             e.printStackTrace();
1345
1346             releaseWriteOnly(writeState.getGraph());
1347
1348             clusterTable.removeWriteOnlyClusters();
1349
1350             if (callback != null)
1351                 callback.exception(new DatabaseException(e));
1352
1353             state.stopWriteTransaction(clusterStream);
1354             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1355
1356         } finally  {
1357
1358             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1359
1360         }
1361
1362     }
1363
1364     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1365         scheduleRequest(request, callback, notify, null);
1366     }
1367
1368     @Override
1369     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1370
1371         assertAlive();
1372
1373         assert (request != null);
1374
1375         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1376
1377         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1378
1379             @Override
1380             public void run(int thread) {
1381
1382                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1383
1384                     try {
1385
1386                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1387
1388                         flushCounter = 0;
1389                         Disposable.safeDispose(clientChanges);
1390                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1391
1392                         acquireWriteOnly();
1393
1394                         VirtualGraph vg = getProvider(request.getProvider());
1395                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1396
1397                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1398
1399                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1400
1401                             @Override
1402                             public void execute(Object result) {
1403                                 if(callback != null) callback.accept(null);
1404                             }
1405
1406                             @Override
1407                             public void exception(Throwable t) {
1408                                 if(callback != null) callback.accept((DatabaseException)t);
1409                             }
1410
1411                         });
1412
1413                         assert (null != writer);
1414 //                        writer.state.barrier.inc();
1415
1416                         try {
1417
1418                             request.perform(writer);
1419
1420                         } catch (Throwable e) {
1421
1422 //                            writer.state.barrier.dec();
1423 //                            writer.waitAsync(null);
1424
1425                             releaseWriteOnly(writer);
1426
1427                             clusterTable.removeWriteOnlyClusters();
1428
1429                             if(!(e instanceof CancelTransactionException)) {
1430                                 if (callback != null)
1431                                     callback.accept(new DatabaseException(e));
1432                             }
1433
1434                             writeState.except(e);
1435
1436                             return;
1437
1438                         }
1439
1440                         // This makes clusters available from server
1441                         boolean empty = clusterStream.reallyFlush();
1442                         // This was needed to make WO requests call metadata listeners.
1443                         // NOTE: the calling event does not contain clientChanges information.
1444                         if (!empty && clientChanges.isEmpty())
1445                             clientChanges.setNotEmpty(true);
1446                         releaseWriteOnly(writer);
1447                         assert (null != writer);
1448                     } finally  {
1449
1450                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1451
1452                     }
1453
1454
1455                 task.finish();
1456
1457             }
1458
1459         }, combine);
1460
1461     }
1462
1463     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1464         scheduleRequest(request, callback, notify, null);
1465     }
1466
1467     /* (non-Javadoc)
1468      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1469      */
1470     @Override
1471     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1472
1473         assert (request != null);
1474
1475         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1476
1477         requestManager.scheduleWrite(new SessionTask(request, thread) {
1478
1479             @Override
1480             public void run(int thread) {
1481
1482                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1483
1484                 performWriteOnly(request, notify, callback);
1485
1486                 task.finish();
1487
1488             }
1489
1490         }, combine);
1491
1492     }
1493
1494     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1495
1496         assert (request != null);
1497         assert (procedure != null);
1498
1499         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1500
1501         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1502
1503             @Override
1504             public void run(int thread) {
1505
1506                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1507
1508                 ListenerBase listener = getListenerBase(procedure);
1509
1510                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1511
1512                 try {
1513
1514                     if (listener != null) {
1515
1516                         try {
1517                                 newGraph.processor.query(newGraph, request, null, new AsyncProcedure<T>() {
1518
1519                                         @Override
1520                                         public void exception(AsyncReadGraph graph, Throwable t) {
1521                                                 procedure.exception(graph, t);
1522                                                 if(throwable != null) {
1523                                                         throwable.set(t);
1524                                                 } else {
1525                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1526                                                 }
1527                                         }
1528
1529                                         @Override
1530                                         public void execute(AsyncReadGraph graph, T t) {
1531                                                 if(result != null) result.set(t);
1532                                                 procedure.execute(graph, t);
1533                                         }
1534
1535                                 }, listener);
1536                         } catch (Throwable t) {
1537                             // This is handled by the AsyncProcedure
1538                                 //Logger.defaultLogError("Internal error", t);
1539                         }
1540
1541                     } else {
1542
1543                         try {
1544
1545 //                            newGraph.state.barrier.inc();
1546
1547                             T t = request.perform(newGraph);
1548
1549                             try {
1550
1551                                 if(result != null) result.set(t);
1552                                 procedure.execute(newGraph, t);
1553
1554                             } catch (Throwable th) {
1555
1556                                 if(throwable != null) {
1557                                     throwable.set(th);
1558                                 } else {
1559                                     Logger.defaultLogError("Unhandled exception", th);
1560                                 }
1561
1562                             }
1563
1564                         } catch (Throwable t) {
1565
1566                             if (DEBUG)
1567                                 t.printStackTrace();
1568
1569                             if(throwable != null) {
1570                                 throwable.set(t);
1571                             } else {
1572                                 Logger.defaultLogError("Unhandled exception", t);
1573                             }
1574
1575                             try {
1576
1577                                 procedure.exception(newGraph, t);
1578
1579                             } catch (Throwable t2) {
1580
1581                                 if(throwable != null) {
1582                                     throwable.set(t2);
1583                                 } else {
1584                                     Logger.defaultLogError("Unhandled exception", t2);
1585                                 }
1586
1587                             }
1588
1589                         }
1590
1591 //                        newGraph.state.barrier.dec();
1592 //                        newGraph.waitAsync(request);
1593
1594                     }
1595
1596                 } finally {
1597
1598                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1599
1600                 }
1601
1602             }
1603
1604         });
1605
1606     }
1607
1608     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1609
1610         assert (request != null);
1611         assert (procedure != null);
1612
1613         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1614
1615         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1616
1617             @Override
1618             public void run(int thread) {
1619
1620                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1621
1622                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1623
1624                 try {
1625
1626                     if (listener != null) {
1627
1628                         try {
1629                                                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1630                                                 } catch (DatabaseException e) {
1631                                                         Logger.defaultLogError(e);
1632                                                 }
1633
1634                     } else {
1635
1636                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1637                                 procedure, "request");
1638
1639                         try {
1640
1641 //                            newGraph.state.barrier.inc();
1642
1643                             request.perform(newGraph, wrapper);
1644
1645 //                            newGraph.waitAsync(request);
1646
1647                         } catch (Throwable t) {
1648
1649                             wrapper.exception(newGraph, t);
1650 //                            newGraph.waitAsync(request);
1651
1652
1653                         }
1654
1655                     }
1656
1657                 } finally {
1658
1659                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1660
1661                 }
1662
1663             }
1664
1665         });
1666
1667     }
1668
1669     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1670
1671         assert (request != null);
1672         assert (procedure != null);
1673
1674         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1675
1676         int sync = notify != null ? thread : -1;
1677
1678         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1679
1680             @Override
1681             public void run(int thread) {
1682
1683                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1684
1685                 ListenerBase listener = getListenerBase(procedure);
1686
1687                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1688
1689                 try {
1690
1691                     if (listener != null) {
1692
1693                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1694
1695 //                        newGraph.waitAsync(request);
1696
1697                     } else {
1698
1699                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1700
1701                         try {
1702
1703                             request.perform(newGraph, wrapper);
1704
1705                         } catch (Throwable t) {
1706
1707                             t.printStackTrace();
1708
1709                         }
1710
1711                     }
1712
1713                 } finally {
1714
1715                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1716
1717                 }
1718
1719             }
1720
1721         });
1722
1723     }
1724
1725     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1726
1727         assert (request != null);
1728         assert (procedure != null);
1729
1730         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1731
1732         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1733
1734             @Override
1735             public void run(int thread) {
1736
1737                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1738
1739                 ListenerBase listener = getListenerBase(procedure);
1740
1741                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1742
1743                 try {
1744
1745                     if (listener != null) {
1746
1747                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1748
1749                             @Override
1750                             public void exception(Throwable t) {
1751                                 procedure.exception(t);
1752                                 if(throwable != null) {
1753                                     throwable.set(t);
1754                                 }
1755                             }
1756
1757                             @Override
1758                             public void execute(T t) {
1759                                 if(result != null) result.set(t);
1760                                 procedure.execute(t);
1761                             }
1762
1763                         }, listener);
1764
1765 //                        newGraph.waitAsync(request);
1766
1767                     } else {
1768
1769 //                        newGraph.state.barrier.inc();
1770
1771                         request.register(newGraph, new Listener<T>() {
1772
1773                             @Override
1774                             public void exception(Throwable t) {
1775                                 if(throwable != null) throwable.set(t);
1776                                 procedure.exception(t);
1777 //                                newGraph.state.barrier.dec();
1778                             }
1779
1780                             @Override
1781                             public void execute(T t) {
1782                                 if(result != null) result.set(t);
1783                                 procedure.execute(t);
1784 //                                newGraph.state.barrier.dec();
1785                             }
1786
1787                             @Override
1788                             public boolean isDisposed() {
1789                                 return true;
1790                             }
1791
1792                         });
1793
1794 //                        newGraph.waitAsync(request);
1795
1796                     }
1797
1798                 } finally {
1799
1800                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1801
1802                 }
1803
1804             }
1805
1806         });
1807
1808     }
1809
1810
1811     @Override
1812     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1813
1814         scheduleRequest(request, procedure, null, null, null);
1815
1816     }
1817
1818     @Override
1819     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1820         assertNotSession();
1821         Semaphore notify = new Semaphore(0);
1822         DataContainer<Throwable> container = new DataContainer<Throwable>();
1823         DataContainer<T> result = new DataContainer<T>();
1824         scheduleRequest(request, procedure, notify, container, result);
1825         acquire(notify, request);
1826         Throwable throwable = container.get();
1827         if(throwable != null) {
1828             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1829             else throw new DatabaseException("Unexpected exception", throwable);
1830         }
1831         return result.get();
1832     }
1833
1834     @Override
1835     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1836         assertNotSession();
1837         Semaphore notify = new Semaphore(0);
1838         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1839         final DataContainer<T> resultContainer = new DataContainer<T>();
1840         scheduleRequest(request, new AsyncProcedure<T>() {
1841             @Override
1842             public void exception(AsyncReadGraph graph, Throwable throwable) {
1843                 exceptionContainer.set(throwable);
1844                 procedure.exception(graph, throwable);
1845             }
1846             @Override
1847             public void execute(AsyncReadGraph graph, T result) {
1848                 resultContainer.set(result);
1849                 procedure.execute(graph, result);
1850             }
1851         }, getListenerBase(procedure), notify);
1852         acquire(notify, request, procedure);
1853         Throwable throwable = exceptionContainer.get();
1854         if (throwable != null) {
1855             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1856             else throw new DatabaseException("Unexpected exception", throwable);
1857         }
1858         return resultContainer.get();
1859     }
1860
1861
1862     @Override
1863     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1864         assertNotSession();
1865         Semaphore notify = new Semaphore(0);
1866         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1867         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1868             @Override
1869             public void exception(AsyncReadGraph graph, Throwable throwable) {
1870                 exceptionContainer.set(throwable);
1871                 procedure.exception(graph, throwable);
1872             }
1873             @Override
1874             public void execute(AsyncReadGraph graph, T result) {
1875                 procedure.execute(graph, result);
1876             }
1877             @Override
1878             public void finished(AsyncReadGraph graph) {
1879                 procedure.finished(graph);
1880             }
1881         }, notify);
1882         acquire(notify, request, procedure);
1883         Throwable throwable = exceptionContainer.get();
1884         if (throwable != null) {
1885             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1886             else throw new DatabaseException("Unexpected exception", throwable);
1887         }
1888         // TODO: implement return value
1889         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1890         return null;
1891     }
1892
1893     @Override
1894     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1895         return syncRequest(request, new ProcedureAdapter<T>());
1896
1897
1898 //        assert(request != null);
1899 //
1900 //        final DataContainer<T> result = new DataContainer<T>();
1901 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1902 //
1903 //        syncRequest(request, new Procedure<T>() {
1904 //
1905 //            @Override
1906 //            public void execute(T t) {
1907 //                result.set(t);
1908 //            }
1909 //
1910 //            @Override
1911 //            public void exception(Throwable t) {
1912 //                exception.set(t);
1913 //            }
1914 //
1915 //        });
1916 //
1917 //        Throwable t = exception.get();
1918 //        if(t != null) {
1919 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1920 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1921 //        }
1922 //
1923 //        return result.get();
1924
1925     }
1926
1927     @Override
1928     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1929         return syncRequest(request, (Procedure<T>)procedure);
1930     }
1931
1932
1933 //    @Override
1934 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1935 //        assertNotSession();
1936 //        Semaphore notify = new Semaphore(0);
1937 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1938 //        DataContainer<T> result = new DataContainer<T>();
1939 //        scheduleRequest(request, procedure, notify, container, result);
1940 //        acquire(notify, request);
1941 //        Throwable throwable = container.get();
1942 //        if(throwable != null) {
1943 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1944 //            else throw new DatabaseException("Unexpected exception", throwable);
1945 //        }
1946 //        return result.get();
1947 //    }
1948
1949
1950     @Override
1951     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1952         assertNotSession();
1953         Semaphore notify = new Semaphore(0);
1954         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1955         final DataContainer<T> result = new DataContainer<T>();
1956         scheduleRequest(request, procedure, notify, container, result);
1957         acquire(notify, request);
1958         Throwable throwable = container.get();
1959         if (throwable != null) {
1960             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1961             else throw new DatabaseException("Unexpected exception", throwable);
1962         }
1963         return result.get();
1964     }
1965
1966     @Override
1967     public void syncRequest(Write request) throws DatabaseException  {
1968         assertNotSession();
1969         assertAlive();
1970         Semaphore notify = new Semaphore(0);
1971         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1972         scheduleRequest(request, e -> exception.set(e), notify);
1973         acquire(notify, request);
1974         if(exception.get() != null) throw exception.get();
1975     }
1976
1977     @Override
1978     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1979         assertNotSession();
1980         Semaphore notify = new Semaphore(0);
1981         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1982         final DataContainer<T> result = new DataContainer<T>();
1983         scheduleRequest(request, new Procedure<T>() {
1984
1985             @Override
1986             public void exception(Throwable t) {
1987               exception.set(t);
1988             }
1989
1990             @Override
1991             public void execute(T t) {
1992               result.set(t);
1993             }
1994
1995         }, notify);
1996         acquire(notify, request);
1997         if(exception.get() != null) {
1998             Throwable t = exception.get();
1999             if(t instanceof DatabaseException) throw (DatabaseException)t;
2000             else throw new DatabaseException(t);
2001         }
2002         return result.get();
2003     }
2004
2005     @Override
2006     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2007         assertNotSession();
2008         Semaphore notify = new Semaphore(0);
2009         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2010         final DataContainer<T> result = new DataContainer<T>();
2011         scheduleRequest(request, new Procedure<T>() {
2012
2013             @Override
2014             public void exception(Throwable t) {
2015               exception.set(t);
2016             }
2017
2018             @Override
2019             public void execute(T t) {
2020               result.set(t);
2021             }
2022
2023         }, notify);
2024         acquire(notify, request);
2025         if(exception.get() != null) {
2026             Throwable t = exception.get();
2027             if(t instanceof DatabaseException) throw (DatabaseException)t;
2028             else throw new DatabaseException(t);
2029         }
2030         return result.get();
2031     }
2032
2033     @Override
2034     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2035         assertNotSession();
2036         Semaphore notify = new Semaphore(0);
2037         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2038         final DataContainer<T> result = new DataContainer<T>();
2039         scheduleRequest(request, new Procedure<T>() {
2040
2041             @Override
2042             public void exception(Throwable t) {
2043               exception.set(t);
2044             }
2045
2046             @Override
2047             public void execute(T t) {
2048               result.set(t);
2049             }
2050
2051         }, notify);
2052         acquire(notify, request);
2053         if(exception.get() != null) {
2054             Throwable t = exception.get();
2055             if(t instanceof DatabaseException) throw (DatabaseException)t;
2056             else throw new DatabaseException(t);
2057         }
2058         return result.get();
2059     }
2060
2061     @Override
2062     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2063         assertNotSession();
2064         Semaphore notify = new Semaphore(0);
2065         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2066         scheduleRequest(request, e -> exception.set(e), notify);
2067         acquire(notify, request);
2068         if(exception.get() != null) throw exception.get();
2069     }
2070
2071     @Override
2072     public void syncRequest(WriteOnly request) throws DatabaseException  {
2073         assertNotSession();
2074         assertAlive();
2075         Semaphore notify = new Semaphore(0);
2076         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2077         scheduleRequest(request, e -> exception.set(e), notify);
2078         acquire(notify, request);
2079         if(exception.get() != null) throw exception.get();
2080     }
2081
2082     /*
2083      *
2084      * GraphRequestProcessor interface
2085      */
2086
2087     @Override
2088     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2089
2090         scheduleRequest(request, procedure, null, null);
2091
2092     }
2093
2094     @Override
2095     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2096
2097         scheduleRequest(request, procedure, null);
2098
2099     }
2100
2101     @Override
2102     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2103
2104         scheduleRequest(request, procedure, null, null, null);
2105
2106     }
2107
2108     @Override
2109     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2110
2111         scheduleRequest(request, callback, null);
2112
2113     }
2114
2115     @Override
2116     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2117
2118         scheduleRequest(request, procedure, null);
2119
2120     }
2121
2122     @Override
2123     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2124
2125         scheduleRequest(request, procedure, null);
2126
2127     }
2128
2129     @Override
2130     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2131
2132         scheduleRequest(request, procedure, null);
2133
2134     }
2135
2136     @Override
2137     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2138
2139         scheduleRequest(request, callback, null);
2140
2141     }
2142
2143     @Override
2144     public void asyncRequest(final Write r) {
2145         asyncRequest(r, null);
2146     }
2147
2148     @Override
2149     public void asyncRequest(final DelayedWrite r) {
2150         asyncRequest(r, null);
2151     }
2152
2153     @Override
2154     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2155
2156         scheduleRequest(request, callback, null);
2157
2158     }
2159
2160     @Override
2161     public void asyncRequest(final WriteOnly request) {
2162
2163         asyncRequest(request, null);
2164
2165     }
2166
2167     @Override
2168     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2169         r.request(this, procedure);
2170     }
2171
2172     @Override
2173     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2174         r.request(this, procedure);
2175     }
2176
2177     @Override
2178     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2179         r.request(this, procedure);
2180     }
2181
2182     @Override
2183     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2184         r.request(this, procedure);
2185     }
2186
2187     @Override
2188     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2189         r.request(this, procedure);
2190     }
2191
2192     @Override
2193     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2194         r.request(this, procedure);
2195     }
2196
2197     @Override
2198     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2199         return r.request(this);
2200     }
2201
2202     @Override
2203     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2204         return r.request(this);
2205     }
2206
2207     @Override
2208     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2209         r.request(this, procedure);
2210     }
2211
2212     @Override
2213     public <T> void async(WriteInterface<T> r) {
2214         r.request(this, new ProcedureAdapter<T>());
2215     }
2216
2217     //@Override
2218     public void incAsync() {
2219         state.incAsync();
2220     }
2221
2222     //@Override
2223     public void decAsync() {
2224         state.decAsync();
2225     }
2226
2227     public long getCluster(ResourceImpl resource) {
2228         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2229         return cluster.getClusterId();
2230     }
2231
2232     public long getCluster(int id) {
2233         if (clusterTable == null)
2234             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2235         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2236     }
2237
2238     public ResourceImpl getResource(int id) {
2239         return new ResourceImpl(resourceSupport, id);
2240     }
2241
2242     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2243         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2244         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2245         int key = proxy.getClusterKey();
2246         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2247         return new ResourceImpl(resourceSupport, resourceKey);
2248     }
2249
2250     public ResourceImpl getResource2(int id) {
2251         assert (id != 0);
2252         return new ResourceImpl(resourceSupport, id);
2253     }
2254
2255     final public int getId(ResourceImpl impl) {
2256         return impl.id;
2257     }
2258
2259     public static final Charset UTF8 = Charset.forName("utf-8");
2260
2261
2262
2263
2264     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2265         for(TransientGraph g : support.providers) {
2266             if(g.isPending(subject)) return false;
2267         }
2268         return true;
2269     }
2270
2271     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2272         for(TransientGraph g : support.providers) {
2273             if(g.isPending(subject, predicate)) return false;
2274         }
2275         return true;
2276     }
2277
2278     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2279
2280         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2281
2282             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2283
2284             @Override
2285             public void accept(ReadGraphImpl graph) {
2286                 if(ready.decrementAndGet() == 0) {
2287                     runnable.accept(graph);
2288                 }
2289             }
2290
2291         };
2292
2293         for(TransientGraph g : support.providers) {
2294             if(g.isPending(subject)) {
2295                 try {
2296                     g.load(graph, subject, composite);
2297                 } catch (DatabaseException e) {
2298                     e.printStackTrace();
2299                 }
2300             } else {
2301                 composite.accept(graph);
2302             }
2303         }
2304
2305         composite.accept(graph);
2306
2307     }
2308
2309     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2310
2311         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2312
2313             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2314
2315             @Override
2316             public void accept(ReadGraphImpl graph) {
2317                 if(ready.decrementAndGet() == 0) {
2318                     runnable.accept(graph);
2319                 }
2320             }
2321
2322         };
2323
2324         for(TransientGraph g : support.providers) {
2325             if(g.isPending(subject, predicate)) {
2326                 try {
2327                     g.load(graph, subject, predicate, composite);
2328                 } catch (DatabaseException e) {
2329                     e.printStackTrace();
2330                 }
2331             } else {
2332                 composite.accept(graph);
2333             }
2334         }
2335
2336         composite.accept(graph);
2337
2338     }
2339
2340 //    void dumpHeap() {
2341 //
2342 //        try {
2343 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2344 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2345 //                    "d:/heap" + flushCounter + ".txt", true);
2346 //        } catch (IOException e) {
2347 //            e.printStackTrace();
2348 //        }
2349 //
2350 //    }
2351
2352     void fireReactionsToSynchronize(ChangeSet cs) {
2353
2354         // Do not fire empty events
2355         if (cs.isEmpty())
2356             return;
2357
2358         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2359 //        g.state.barrier.inc();
2360
2361         try {
2362
2363             if (!cs.isEmpty()) {
2364                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2365                 for (ChangeListener l : changeListeners2) {
2366                     try {
2367                         l.graphChanged(e2);
2368                     } catch (Exception ex) {
2369                         ex.printStackTrace();
2370                     }
2371                 }
2372             }
2373
2374         } finally {
2375
2376 //            g.state.barrier.dec();
2377 //            g.waitAsync(null);
2378
2379         }
2380
2381     }
2382
2383     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2384         try {
2385
2386             // Do not fire empty events
2387             if (cs2.isEmpty())
2388                 return;
2389
2390 //            graph.restart();
2391 //            graph.state.barrier.inc();
2392
2393             try {
2394
2395                 if (!cs2.isEmpty()) {
2396
2397                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2398                     for (ChangeListener l : changeListeners2) {
2399                         try {
2400                             l.graphChanged(e2);
2401 //                            System.out.println("changelistener " + l);
2402                         } catch (Exception ex) {
2403                             ex.printStackTrace();
2404                         }
2405                     }
2406
2407                 }
2408
2409             } finally {
2410
2411 //                graph.state.barrier.dec();
2412 //                graph.waitAsync(null);
2413
2414             }
2415         } catch (Throwable t) {
2416             t.printStackTrace();
2417
2418         }
2419     }
2420     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2421
2422         try {
2423
2424             // Do not fire empty events
2425             if (cs2.isEmpty())
2426                 return;
2427
2428             // Do not fire on virtual requests
2429             if(graph.getProvider() != null)
2430                 return;
2431
2432             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2433
2434             try {
2435
2436                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2437                 for (ChangeListener l : metadataListeners) {
2438                     try {
2439                         l.graphChanged(e2);
2440                     } catch (Throwable ex) {
2441                         ex.printStackTrace();
2442                     }
2443                 }
2444
2445             } finally {
2446
2447             }
2448
2449         } catch (Throwable t) {
2450             t.printStackTrace();
2451         }
2452
2453     }
2454
2455
2456     /*
2457      * (non-Javadoc)
2458      *
2459      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2460      */
2461     @Override
2462     public <T> T getService(Class<T> api) {
2463         T t = peekService(api);
2464         if (t == null) {
2465             if (state.isClosed())
2466                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2467             throw new ServiceNotFoundException(this, api);
2468         }
2469         return t;
2470     }
2471
2472     /**
2473      * @return
2474      */
2475     protected abstract ServerInformation getCachedServerInformation();
2476
2477         private Class<?> serviceKey1 = null;
2478         private Class<?> serviceKey2 = null;
2479         private Object service1 = null;
2480         private Object service2 = null;
2481
2482     /*
2483      * (non-Javadoc)
2484      *
2485      * @see
2486      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2487      */
2488     @SuppressWarnings("unchecked")
2489     @Override
2490     public synchronized <T> T peekService(Class<T> api) {
2491
2492                 if(serviceKey1 == api) {
2493                         return (T)service1;
2494                 } else if (serviceKey2 == api) {
2495                         // Promote this key
2496                         Object result = service2;
2497                         service2 = service1;
2498                         serviceKey2 = serviceKey1;
2499                         service1 = result;
2500                         serviceKey1 = api;
2501                         return (T)result;
2502                 }
2503
2504         if (Layer0.class == api)
2505                 return (T) L0;
2506         if (ServerInformation.class == api)
2507             return (T) getCachedServerInformation();
2508         else if (WriteGraphImpl.class == api)
2509             return (T) writeState.getGraph();
2510         else if (ClusterBuilder.class == api)
2511             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2512         else if (ClusterBuilderFactory.class == api)
2513             return (T)new ClusterBuilderFactoryImpl(this);
2514
2515                 service2 = service1;
2516                 serviceKey2 = serviceKey1;
2517
2518         service1 = serviceLocator.peekService(api);
2519         serviceKey1 = api;
2520
2521         return (T)service1;
2522
2523     }
2524
2525     /*
2526      * (non-Javadoc)
2527      *
2528      * @see
2529      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2530      */
2531     @Override
2532     public boolean hasService(Class<?> api) {
2533         return serviceLocator.hasService(api);
2534     }
2535
2536     /**
2537      * @param api the api that must be implemented by the specified service
2538      * @param service the service implementation
2539      */
2540     @Override
2541     public <T> void registerService(Class<T> api, T service) {
2542         if(Layer0.class == api) {
2543                 L0 = (Layer0)service;
2544                 return;
2545         }
2546         serviceLocator.registerService(api, service);
2547         if (TransactionPolicySupport.class == api) {
2548             transactionPolicy = (TransactionPolicySupport)service;
2549             state.resetTransactionPolicy();
2550         }
2551         if (api == serviceKey1)
2552                 service1 = service;
2553         else if (api == serviceKey2)
2554                 service2 = service;
2555     }
2556
2557     // ----------------
2558     // SessionMonitor
2559     // ----------------
2560
2561     void fireSessionVariableChange(String variable) {
2562         for (MonitorHandler h : monitorHandlers) {
2563             MonitorContext ctx = monitorContexts.getLeft(h);
2564             assert ctx != null;
2565             // SafeRunner functionality repeated here to avoid dependency.
2566             try {
2567                 h.valuesChanged(ctx);
2568             } catch (Exception e) {
2569                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2570             } catch (LinkageError e) {
2571                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2572             }
2573         }
2574     }
2575
2576
2577     class ResourceSerializerImpl implements ResourceSerializer {
2578
2579         public long createRandomAccessId(int id) throws DatabaseException {
2580             if(id < 0)
2581                 return id;
2582             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2583             long cluster = getCluster(id);
2584             if (0 == cluster)
2585                 return 0; // Better to return 0 then invalid id.
2586             long result = ClusterTraitsBase.createResourceId(cluster, index);
2587             return result;
2588         }
2589
2590         @Override
2591         public long getRandomAccessId(Resource resource) throws DatabaseException {
2592
2593             ResourceImpl resourceImpl = (ResourceImpl) resource;
2594             return createRandomAccessId(resourceImpl.id);
2595
2596         }
2597
2598         @Override
2599         public int getTransientId(Resource resource) throws DatabaseException {
2600
2601             ResourceImpl resourceImpl = (ResourceImpl) resource;
2602             return resourceImpl.id;
2603
2604         }
2605
2606         @Override
2607         public String createRandomAccessId(Resource resource)
2608         throws InvalidResourceReferenceException {
2609
2610             if(resource == null) throw new IllegalArgumentException();
2611
2612             ResourceImpl resourceImpl = (ResourceImpl) resource;
2613
2614             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2615
2616             int r;
2617             try {
2618                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2619             } catch (DatabaseException e1) {
2620                 throw new InvalidResourceReferenceException(e1);
2621             }
2622             try {
2623                 // Serialize as '<resource index>_<cluster id>'
2624                 return "" + r + "_" + getCluster(resourceImpl);
2625             } catch (Throwable e) {
2626                 e.printStackTrace();
2627                 throw new InvalidResourceReferenceException(e);
2628             } finally {
2629             }
2630         }
2631
2632         public int getTransientId(long serialized) throws DatabaseException {
2633             if (serialized <= 0)
2634                 return (int)serialized;
2635             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2636             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2637             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2638             if (cluster == null)
2639                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2640             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2641             return key;
2642         }
2643
2644         @Override
2645         public Resource getResource(long randomAccessId) throws DatabaseException {
2646             return getResourceByKey(getTransientId(randomAccessId));
2647         }
2648
2649         @Override
2650         public Resource getResource(int transientId) throws DatabaseException {
2651             return getResourceByKey(transientId);
2652         }
2653
2654         @Override
2655         public Resource getResource(String randomAccessId)
2656         throws InvalidResourceReferenceException {
2657             try {
2658                 int i = randomAccessId.indexOf('_');
2659                 if (i == -1)
2660                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2661                             + randomAccessId + "'");
2662                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2663                 if(r < 0) return getResourceByKey(r);
2664                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2665                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2666                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2667                 if (cluster.hasResource(key, clusterTranslator))
2668                     return getResourceByKey(key);
2669             } catch (InvalidResourceReferenceException e) {
2670                 throw e;
2671             } catch (NumberFormatException e) {
2672                 throw new InvalidResourceReferenceException(e);
2673             } catch (Throwable e) {
2674                 e.printStackTrace();
2675                 throw new InvalidResourceReferenceException(e);
2676             } finally {
2677             }
2678             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2679         }
2680
2681         @Override
2682         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2683             try {
2684                 return true;
2685             } catch (Throwable e) {
2686                 e.printStackTrace();
2687                 throw new InvalidResourceReferenceException(e);
2688             } finally {
2689             }
2690         }
2691     }
2692
2693     // Copied from old SessionImpl
2694
2695     void check() {
2696         if (state.isClosed())
2697             throw new Error("Session closed.");
2698     }
2699
2700
2701
2702     public ResourceImpl getNewResource(long clusterId) {
2703         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2704         int newId;
2705         try {
2706             newId = cluster.createResource(clusterTranslator);
2707         } catch (DatabaseException e) {
2708             Logger.defaultLogError(e);
2709             return null;
2710         }
2711         return new ResourceImpl(resourceSupport, newId);
2712     }
2713
2714     public ResourceImpl getNewResource(Resource clusterSet)
2715     throws DatabaseException {
2716         long resourceId = clusterSet.getResourceId();
2717         Long clusterId = clusterSetsSupport.get(resourceId);
2718         if (null == clusterId)
2719             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2720         if (Constants.NewClusterId == clusterId) {
2721             clusterId = getService(ClusteringSupport.class).createCluster();
2722             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2723                 createdClusters.add(clusterId);
2724             clusterSetsSupport.put(resourceId, clusterId);
2725             return getNewResource(clusterId);
2726         } else {
2727             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2728             ResourceImpl result;
2729             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2730                 clusterId = getService(ClusteringSupport.class).createCluster();
2731                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2732                     createdClusters.add(clusterId);
2733                 clusterSetsSupport.put(resourceId, clusterId);
2734                 return getNewResource(clusterId);
2735             } else {
2736                 result = getNewResource(clusterId);
2737                 int resultKey = querySupport.getId(result);
2738                 long resultCluster = querySupport.getClusterId(resultKey);
2739                 if (clusterId != resultCluster)
2740                     clusterSetsSupport.put(resourceId, resultCluster);
2741                 return result;
2742             }
2743         }
2744     }
2745
2746     public void getNewClusterSet(Resource clusterSet)
2747     throws DatabaseException {
2748         if (DEBUG)
2749             System.out.println("new cluster set=" + clusterSet);
2750         long resourceId = clusterSet.getResourceId();
2751         if (clusterSetsSupport.containsKey(resourceId))
2752             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2753         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2754     }
2755     public boolean containsClusterSet(Resource clusterSet)
2756     throws ServiceException {
2757         long resourceId = clusterSet.getResourceId();
2758         return clusterSetsSupport.containsKey(resourceId);
2759     }
2760     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2761         Resource r = defaultClusterSet;
2762         defaultClusterSet = clusterSet;
2763         return r;
2764     }
2765     void printDiagnostics() {
2766
2767         if (DIAGNOSTICS) {
2768
2769             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2770             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2771             for(ClusterI cluster : clusterTable.getClusters()) {
2772                 try {
2773                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2774                         residentClusters.set(residentClusters.get() + 1);
2775                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2776                     }
2777                 } catch (DatabaseException e) {
2778                     Logger.defaultLogError(e);
2779                 }
2780             }
2781
2782             System.out.println("--------------------------------");
2783             System.out.println("Cluster information:");
2784             System.out.println("-amount of resident clusters=" + residentClusters.get());
2785             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2786
2787             for(ClusterI cluster : clusterTable.getClusters()) {
2788                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2789                 try {
2790                     if (!cluster.isLoaded())
2791                         System.out.println("not loaded]");
2792                     else if (cluster.isEmpty())
2793                         System.out.println("is empty]");
2794                     else {
2795                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2796                         System.out.print(",references=" + cluster.getReferenceCount());
2797                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2798                     }
2799                 } catch (DatabaseException e) {
2800                     Logger.defaultLogError(e);
2801                     System.out.println("is corrupted]");
2802                 }
2803             }
2804
2805             queryProvider2.printDiagnostics();
2806             System.out.println("--------------------------------");
2807         }
2808
2809     }
2810
2811     public void fireStartReadTransaction() {
2812
2813         if (DIAGNOSTICS)
2814             System.out.println("StartReadTransaction");
2815
2816         for (SessionEventListener listener : eventListeners) {
2817             try {
2818                 listener.readTransactionStarted();
2819             } catch (Throwable t) {
2820                 t.printStackTrace();
2821             }
2822         }
2823
2824     }
2825
2826     public void fireFinishReadTransaction() {
2827
2828         if (DIAGNOSTICS)
2829             System.out.println("FinishReadTransaction");
2830
2831         for (SessionEventListener listener : eventListeners) {
2832             try {
2833                 listener.readTransactionFinished();
2834             } catch (Throwable t) {
2835                 t.printStackTrace();
2836             }
2837         }
2838
2839     }
2840
2841     public void fireStartWriteTransaction() {
2842
2843         if (DIAGNOSTICS)
2844             System.out.println("StartWriteTransaction");
2845
2846         for (SessionEventListener listener : eventListeners) {
2847             try {
2848                 listener.writeTransactionStarted();
2849             } catch (Throwable t) {
2850                 t.printStackTrace();
2851             }
2852         }
2853
2854     }
2855
2856     public void fireFinishWriteTransaction() {
2857
2858         if (DIAGNOSTICS)
2859             System.out.println("FinishWriteTransaction");
2860
2861         for (SessionEventListener listener : eventListeners) {
2862             try {
2863                 listener.writeTransactionFinished();
2864             } catch (Throwable t) {
2865                 t.printStackTrace();
2866             }
2867         }
2868
2869         Indexing.resetDependenciesIndexingDisabled();
2870
2871     }
2872
2873     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2874         throw new Error("Not supported at the moment.");
2875     }
2876
2877     State getState() {
2878         return state;
2879     }
2880
2881     ClusterTable getClusterTable() {
2882         return clusterTable;
2883     }
2884
2885     public GraphSession getGraphSession() {
2886         return graphSession;
2887     }
2888
2889     public QueryProcessor getQueryProvider2() {
2890         return queryProvider2;
2891     }
2892
2893     ClientChangesImpl getClientChanges() {
2894         return clientChanges;
2895     }
2896
2897     boolean getWriteOnly() {
2898         return writeOnly;
2899     }
2900
2901     static int counter = 0;
2902
2903     public void onClusterLoaded(long clusterId) {
2904
2905         clusterTable.updateSize();
2906
2907     }
2908
2909
2910
2911
2912     /*
2913      * Implementation of the interface RequestProcessor
2914      */
2915
2916     @Override
2917     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2918
2919         assertNotSession();
2920         assertAlive();
2921
2922         assert(request != null);
2923
2924         final DataContainer<T> result = new DataContainer<T>();
2925         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2926
2927         syncRequest(request, new AsyncProcedure<T>() {
2928
2929             @Override
2930             public void execute(AsyncReadGraph graph, T t) {
2931                 result.set(t);
2932             }
2933
2934             @Override
2935             public void exception(AsyncReadGraph graph, Throwable t) {
2936                 exception.set(t);
2937             }
2938
2939         });
2940
2941         Throwable t = exception.get();
2942         if(t != null) {
2943             if(t instanceof DatabaseException) throw (DatabaseException)t;
2944             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2945         }
2946
2947         return result.get();
2948
2949     }
2950
2951 //    @Override
2952 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2953 //        assertNotSession();
2954 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2955 //    }
2956
2957     @Override
2958     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2959         assertNotSession();
2960         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2961     }
2962
2963     @Override
2964     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2965         assertNotSession();
2966         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2967     }
2968
2969     @Override
2970     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2971         assertNotSession();
2972         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2973     }
2974
2975     @Override
2976     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2977         assertNotSession();
2978         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2979     }
2980
2981     @Override
2982     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2983
2984         assertNotSession();
2985
2986         assert(request != null);
2987
2988         final DataContainer<T> result = new DataContainer<T>();
2989         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2990
2991         syncRequest(request, new AsyncProcedure<T>() {
2992
2993             @Override
2994             public void execute(AsyncReadGraph graph, T t) {
2995                 result.set(t);
2996             }
2997
2998             @Override
2999             public void exception(AsyncReadGraph graph, Throwable t) {
3000                 exception.set(t);
3001             }
3002
3003         });
3004
3005         Throwable t = exception.get();
3006         if(t != null) {
3007             if(t instanceof DatabaseException) throw (DatabaseException)t;
3008             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3009         }
3010
3011         return result.get();
3012
3013     }
3014
3015     @Override
3016     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3017         assertNotSession();
3018         return syncRequest(request, (AsyncProcedure<T>)procedure);
3019     }
3020
3021     @Override
3022     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3023         assertNotSession();
3024         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3025     }
3026
3027     @Override
3028     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3029         assertNotSession();
3030         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3031     }
3032
3033     @Override
3034     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3035         assertNotSession();
3036         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3037     }
3038
3039     @Override
3040     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3041         assertNotSession();
3042         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3043     }
3044
3045     @Override
3046     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3047
3048         assertNotSession();
3049
3050         assert(request != null);
3051
3052         final ArrayList<T> result = new ArrayList<T>();
3053         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3054
3055         syncRequest(request, new AsyncMultiProcedure<T>() {
3056
3057             @Override
3058             public void execute(AsyncReadGraph graph, T t) {
3059                 synchronized(result) {
3060                     result.add(t);
3061                 }
3062             }
3063
3064             @Override
3065             public void finished(AsyncReadGraph graph) {
3066             }
3067
3068             @Override
3069             public void exception(AsyncReadGraph graph, Throwable t) {
3070                 exception.set(t);
3071             }
3072
3073
3074         });
3075
3076         Throwable t = exception.get();
3077         if(t != null) {
3078             if(t instanceof DatabaseException) throw (DatabaseException)t;
3079             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3080         }
3081
3082         return result;
3083
3084     }
3085
3086     @Override
3087     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3088         assertNotSession();
3089         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3090     }
3091
3092     @Override
3093     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3094         assertNotSession();
3095         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3096     }
3097
3098     @Override
3099     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3100         assertNotSession();
3101         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3102     }
3103
3104     @Override
3105     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3106         assertNotSession();
3107         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3108     }
3109
3110     @Override
3111     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3112         assertNotSession();
3113         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3114     }
3115
3116     @Override
3117     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3118
3119         assertNotSession();
3120
3121         assert(request != null);
3122
3123         final ArrayList<T> result = new ArrayList<T>();
3124         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3125
3126         syncRequest(request, new AsyncMultiProcedure<T>() {
3127
3128             @Override
3129             public void execute(AsyncReadGraph graph, T t) {
3130                 synchronized(result) {
3131                     result.add(t);
3132                 }
3133             }
3134
3135             @Override
3136             public void finished(AsyncReadGraph graph) {
3137             }
3138
3139             @Override
3140             public void exception(AsyncReadGraph graph, Throwable t) {
3141                 exception.set(t);
3142             }
3143
3144
3145         });
3146
3147         Throwable t = exception.get();
3148         if(t != null) {
3149             if(t instanceof DatabaseException) throw (DatabaseException)t;
3150             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3151         }
3152
3153         return result;
3154
3155     }
3156
3157     @Override
3158     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3159         assertNotSession();
3160         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3161     }
3162
3163     @Override
3164     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3165         assertNotSession();
3166         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3167     }
3168
3169     @Override
3170     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3171         assertNotSession();
3172        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3173     }
3174
3175     @Override
3176     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3177         assertNotSession();
3178         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3179     }
3180
3181     @Override
3182     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3183         assertNotSession();
3184         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3185     }
3186
3187
3188     @Override
3189     public <T> void asyncRequest(Read<T> request) {
3190
3191         asyncRequest(request, new ProcedureAdapter<T>() {
3192             @Override
3193             public void exception(Throwable t) {
3194                 t.printStackTrace();
3195             }
3196         });
3197
3198     }
3199
3200     @Override
3201     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3202         asyncRequest(request, (AsyncProcedure<T>)procedure);
3203     }
3204
3205     @Override
3206     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3207         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3208     }
3209
3210     @Override
3211     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3212         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3213     }
3214
3215     @Override
3216     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3217         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3218     }
3219
3220     @Override
3221     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3222         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3223     }
3224
3225     @Override
3226     final public <T> void asyncRequest(final AsyncRead<T> request) {
3227
3228         assert(request != null);
3229
3230         asyncRequest(request, new ProcedureAdapter<T>() {
3231             @Override
3232             public void exception(Throwable t) {
3233                 t.printStackTrace();
3234             }
3235         });
3236
3237     }
3238
3239     @Override
3240     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3241         scheduleRequest(request, procedure, procedure, null);
3242     }
3243
3244     @Override
3245     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3246         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3247     }
3248
3249     @Override
3250     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3251         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3252     }
3253
3254     @Override
3255     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3256         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3257     }
3258
3259     @Override
3260     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3261         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3262     }
3263
3264     @Override
3265     public <T> void asyncRequest(MultiRead<T> request) {
3266
3267         assert(request != null);
3268
3269         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3270             @Override
3271             public void exception(AsyncReadGraph graph, Throwable t) {
3272                 t.printStackTrace();
3273             }
3274         });
3275
3276     }
3277
3278     @Override
3279     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3280         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3281     }
3282
3283     @Override
3284     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3285         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3286     }
3287
3288     @Override
3289     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3290         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3291     }
3292
3293     @Override
3294     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3295         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3296     }
3297
3298     @Override
3299     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3300         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3301     }
3302
3303     @Override
3304     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3305
3306         assert(request != null);
3307
3308         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3309             @Override
3310             public void exception(AsyncReadGraph graph, Throwable t) {
3311                 t.printStackTrace();
3312             }
3313         });
3314
3315     }
3316
3317     @Override
3318     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3319         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3320     }
3321
3322     @Override
3323     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3324         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3325     }
3326
3327     @Override
3328     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3329         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3330     }
3331
3332     @Override
3333     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3334         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3335     }
3336
3337     @Override
3338     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3339         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3340     }
3341
3342     @Override
3343     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3344         assertNotSession();
3345         throw new Error("Not implemented!");
3346     }
3347
3348     @Override
3349     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3350         throw new Error("Not implemented!");
3351     }
3352
3353     @Override
3354     final public <T> void asyncRequest(final ExternalRead<T> request) {
3355
3356         assert(request != null);
3357
3358         asyncRequest(request, new ProcedureAdapter<T>() {
3359             @Override
3360             public void exception(Throwable t) {
3361                 t.printStackTrace();
3362             }
3363         });
3364
3365     }
3366
3367     @Override
3368     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3369         asyncRequest(request, (Procedure<T>)procedure);
3370     }
3371
3372
3373
3374     void check(Throwable t) throws DatabaseException {
3375         if(t != null) {
3376             if(t instanceof DatabaseException) throw (DatabaseException)t;
3377             else throw new DatabaseException("Unexpected exception", t);
3378         }
3379     }
3380
3381     void check(DataContainer<Throwable> container) throws DatabaseException {
3382         Throwable t = container.get();
3383         if(t != null) {
3384             if(t instanceof DatabaseException) throw (DatabaseException)t;
3385             else throw new DatabaseException("Unexpected exception", t);
3386         }
3387     }
3388
3389
3390
3391
3392
3393
3394     boolean sameProvider(Write request) {
3395         if(writeState.getGraph().provider != null) {
3396             return writeState.getGraph().provider.equals(request.getProvider());
3397         } else {
3398             return request.getProvider() == null;
3399         }
3400     }
3401
3402     boolean plainWrite(WriteGraphImpl graph) {
3403         if(graph == null) return false;
3404         if(graph.writeSupport.writeOnly()) return false;
3405         return true;
3406     }
3407
3408
3409     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3410     private void assertNotSession() throws DatabaseException {
3411         Thread current = Thread.currentThread();
3412         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3413     }
3414
3415     void assertAlive() {
3416         if (!state.isAlive())
3417             throw new RuntimeDatabaseException("Session has been shut down.");
3418     }
3419
3420     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3421         return querySupport.getValueStream(graph, querySupport.getId(resource));
3422     }
3423
3424     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3425         return querySupport.getValue(graph, querySupport.getId(resource));
3426     }
3427
3428     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3429         acquire(semaphore, request, null);
3430     }
3431
3432     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3433         assertAlive();
3434         try {
3435             long tryCount = 0;
3436             // Loop until the semaphore is acquired reporting requests that take a long time.
3437             while (true) {
3438
3439                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3440                     // Acquired OK.
3441                     return;
3442                 } else {
3443                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3444                         ++tryCount;
3445                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3446                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3447                                 * tryCount;
3448                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3449                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3450                     }
3451                 }
3452
3453                 assertAlive();
3454
3455             }
3456         } catch (InterruptedException e) {
3457             e.printStackTrace();
3458             // FIXME: Should perhaps do something else in this case ??
3459         }
3460     }
3461
3462
3463
3464 //    @Override
3465 //    public boolean holdOnToTransactionAfterCancel() {
3466 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3467 //    }
3468 //
3469 //    @Override
3470 //    public boolean holdOnToTransactionAfterCommit() {
3471 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3472 //    }
3473 //
3474 //    @Override
3475 //    public boolean holdOnToTransactionAfterRead() {
3476 //        return transactionPolicy.holdOnToTransactionAfterRead();
3477 //    }
3478 //
3479 //    @Override
3480 //    public void onRelinquish() {
3481 //        transactionPolicy.onRelinquish();
3482 //    }
3483 //
3484 //    @Override
3485 //    public void onRelinquishDone() {
3486 //        transactionPolicy.onRelinquishDone();
3487 //    }
3488 //
3489 //    @Override
3490 //    public void onRelinquishError() {
3491 //        transactionPolicy.onRelinquishError();
3492 //    }
3493
3494
3495     @Override
3496     public Session getSession() {
3497         return this;
3498     }
3499
3500
3501     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3502     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3503
3504     public void ceased(int thread) {
3505
3506         requestManager.ceased(thread);
3507
3508     }
3509
3510     public int getAmountOfQueryThreads() {
3511         // This must be a power of two
3512         return 4;
3513 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3514     }
3515
3516     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3517
3518         return new ResourceImpl(resourceSupport, key);
3519
3520     }
3521
3522     public void acquireWriteOnly() {
3523         writeOnly = true;
3524     }
3525
3526     public void releaseWriteOnly(ReadGraphImpl graph) {
3527         writeOnly = false;
3528         queryProvider2.releaseWrite(graph);
3529     }
3530
3531     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3532
3533         long start = System.nanoTime();
3534
3535         while(dirtyPrimitives) {
3536             dirtyPrimitives = false;
3537             getQueryProvider2().performDirtyUpdates(writer);
3538             getQueryProvider2().performScheduledUpdates(writer);
3539         }
3540
3541         fireMetadataListeners(writer, clientChanges);
3542
3543         if(DebugPolicy.PERFORMANCE_DATA) {
3544             long end = System.nanoTime();
3545             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3546         }
3547
3548     }
3549
3550     void removeTemporaryData() {
3551         File platform = Platform.getLocation().toFile();
3552         File tempFiles = new File(platform, "tempFiles");
3553         File temp = new File(tempFiles, "db");
3554         if(!temp.exists()) 
3555             return;
3556         try {
3557             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3558                 @Override
3559                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3560                     try {
3561                         Files.delete(file);
3562                     } catch (IOException e) {
3563                         Logger.defaultLogError(e);
3564                     }
3565                     return FileVisitResult.CONTINUE;
3566                 }
3567             });
3568         } catch (IOException e) {
3569             Logger.defaultLogError(e);
3570         }
3571     }
3572
3573     public void handleCreatedClusters() {
3574         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3575             createdClusters.forEach(new TLongProcedure() {
3576
3577                 @Override
3578                 public boolean execute(long value) {
3579                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3580                     cluster.setImmutable(true, clusterTranslator);
3581                     return true;
3582                 }
3583             });
3584         }
3585         createdClusters.clear();
3586     }
3587
3588     @Override
3589     public Object getModificationCounter() {
3590         return queryProvider2.modificationCounter;
3591     }
3592     
3593     @Override
3594     public void markUndoPoint() {
3595         state.setCombine(false);
3596     }
3597
3598 }