]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Multiple simultaneous readers
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryCache;
93 import org.simantics.db.impl.query.QueryProcessor;
94 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
95 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
96 import org.simantics.db.impl.service.QueryDebug;
97 import org.simantics.db.impl.support.VirtualGraphServerSupport;
98 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
99 import org.simantics.db.procedure.AsyncListener;
100 import org.simantics.db.procedure.AsyncMultiListener;
101 import org.simantics.db.procedure.AsyncMultiProcedure;
102 import org.simantics.db.procedure.AsyncProcedure;
103 import org.simantics.db.procedure.Listener;
104 import org.simantics.db.procedure.ListenerBase;
105 import org.simantics.db.procedure.MultiListener;
106 import org.simantics.db.procedure.MultiProcedure;
107 import org.simantics.db.procedure.Procedure;
108 import org.simantics.db.procedure.SyncListener;
109 import org.simantics.db.procedure.SyncMultiListener;
110 import org.simantics.db.procedure.SyncMultiProcedure;
111 import org.simantics.db.procedure.SyncProcedure;
112 import org.simantics.db.procore.cluster.ClusterImpl;
113 import org.simantics.db.procore.cluster.ClusterTraits;
114 import org.simantics.db.procore.protocol.Constants;
115 import org.simantics.db.procore.protocol.DebugPolicy;
116 import org.simantics.db.request.AsyncMultiRead;
117 import org.simantics.db.request.AsyncRead;
118 import org.simantics.db.request.DelayedWrite;
119 import org.simantics.db.request.DelayedWriteResult;
120 import org.simantics.db.request.ExternalRead;
121 import org.simantics.db.request.MultiRead;
122 import org.simantics.db.request.Read;
123 import org.simantics.db.request.ReadInterface;
124 import org.simantics.db.request.Write;
125 import org.simantics.db.request.WriteInterface;
126 import org.simantics.db.request.WriteOnly;
127 import org.simantics.db.request.WriteOnlyResult;
128 import org.simantics.db.request.WriteResult;
129 import org.simantics.db.request.WriteTraits;
130 import org.simantics.db.service.ByteReader;
131 import org.simantics.db.service.ClusterBuilder;
132 import org.simantics.db.service.ClusterBuilderFactory;
133 import org.simantics.db.service.ClusterControl;
134 import org.simantics.db.service.ClusterSetsSupport;
135 import org.simantics.db.service.ClusterUID;
136 import org.simantics.db.service.ClusteringSupport;
137 import org.simantics.db.service.CollectionSupport;
138 import org.simantics.db.service.DebugSupport;
139 import org.simantics.db.service.DirectQuerySupport;
140 import org.simantics.db.service.GraphChangeListenerSupport;
141 import org.simantics.db.service.InitSupport;
142 import org.simantics.db.service.LifecycleSupport;
143 import org.simantics.db.service.ManagementSupport;
144 import org.simantics.db.service.QueryControl;
145 import org.simantics.db.service.SerialisationSupport;
146 import org.simantics.db.service.ServerInformation;
147 import org.simantics.db.service.ServiceActivityMonitor;
148 import org.simantics.db.service.SessionEventSupport;
149 import org.simantics.db.service.SessionMonitorSupport;
150 import org.simantics.db.service.SessionUserSupport;
151 import org.simantics.db.service.StatementSupport;
152 import org.simantics.db.service.TransactionPolicySupport;
153 import org.simantics.db.service.TransactionSupport;
154 import org.simantics.db.service.TransferableGraphSupport;
155 import org.simantics.db.service.UndoRedoSupport;
156 import org.simantics.db.service.VirtualGraphSupport;
157 import org.simantics.db.service.XSupport;
158 import org.simantics.layer0.Layer0;
159 import org.simantics.utils.DataContainer;
160 import org.simantics.utils.Development;
161 import org.simantics.utils.threads.logger.ITask;
162 import org.simantics.utils.threads.logger.ThreadLogger;
163
164 import gnu.trove.procedure.TLongProcedure;
165 import gnu.trove.set.hash.TLongHashSet;
166
167
168 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
169
170     protected static final boolean DEBUG        = false;
171
172     private static final boolean DIAGNOSTICS       = false;
173
174     private TransactionPolicySupport transactionPolicy;
175
176     final private ClusterControl clusterControl;
177
178     final protected BuiltinSupportImpl builtinSupport;
179     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
180     final protected ClusterSetsSupport clusterSetsSupport;
181     final protected LifecycleSupportImpl lifecycleSupport;
182
183     protected QuerySupportImpl querySupport;
184     protected ResourceSupportImpl resourceSupport;
185     protected WriteSupport writeSupport;
186     public ClusterTranslator clusterTranslator;
187
188     boolean dirtyPrimitives = false;
189
190     public static final int SERVICE_MODE_CREATE = 2;
191     public static final int SERVICE_MODE_ALLOW = 1;
192     public int serviceMode = 0;
193     public boolean createdImmutableClusters = false;
194     public TLongHashSet createdClusters = new TLongHashSet();
195
196     private Layer0 L0;
197
198     /**
199      * The service locator maintained by the workbench. These services are
200      * initialized during workbench during the <code>init</code> method.
201      */
202     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
203
204     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
205
206     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
207
208     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
209
210     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
211
212     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
213
214     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
215
216     final protected State                                        state              = new State();
217
218     protected GraphSession                                       graphSession       = null;
219
220     protected SessionManager                                     sessionManagerImpl = null;
221
222     protected UserAuthenticationAgent                            authAgent          = null;
223
224     protected UserAuthenticator                                  authenticator      = null;
225
226     protected Resource                                           user               = null;
227
228     protected ClusterStream                                      clusterStream      = null;
229
230     protected SessionRequestManager                         requestManager = null;
231
232     public ClusterTable                                       clusterTable       = null;
233
234     public QueryProcessor                                     queryProvider2 = null;
235
236     ClientChangesImpl                                  clientChanges      = null;
237
238     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
239
240 //    protected long                                               newClusterId       = Constants.NullClusterId;
241
242     protected int                                                flushCounter       = 0;
243
244     protected boolean                                            writeOnly          = false;
245
246     WriteState<?>                                                writeState = null;
247     WriteStateBase<?>                                            delayedWriteState = null;
248     protected Resource defaultClusterSet = null; // If not null then used for newResource().
249
250     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
251
252 //        if (authAgent == null)
253 //            throw new IllegalArgumentException("null authentication agent");
254
255         File t = StaticSessionProperties.virtualGraphStoragePath;
256         if (null == t)
257             t = new File(".");
258         this.clusterTable = new ClusterTable(this, t);
259         this.builtinSupport = new BuiltinSupportImpl(this);
260         this.sessionManagerImpl = sessionManagerImpl;
261         this.user = null;
262 //        this.authAgent = authAgent;
263
264         serviceLocator.registerService(Session.class, this);
265         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
266         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
267         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
268         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
269         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
270         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
271         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
272         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
273         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
274         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
275         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
276         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
277         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
278         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
279         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
280         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
281         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
282         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
283         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
284         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
285         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
286         ServiceActivityUpdaterForWriteTransactions.register(this);
287
288         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
289         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
290         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
291         this.lifecycleSupport = new LifecycleSupportImpl(this);
292         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
293         this.transactionPolicy = new TransactionPolicyRelease();
294         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
295         this.clusterControl = new ClusterControlImpl(this);
296         serviceLocator.registerService(ClusterControl.class, clusterControl);
297         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
298         this.clusterSetsSupport.setReadDirectory(t.toPath());
299         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
300         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
301     }
302
303     /*
304      *
305      * Session interface
306      */
307
308
309     @Override
310     public Resource getRootLibrary() {
311         return queryProvider2.getRootLibraryResource();
312     }
313
314     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
315         if (!graphSession.dbSession.refreshEnabled())
316             return;
317         try {
318             getClusterTable().refresh(csid, this, clusterUID);
319         } catch (Throwable t) {
320             Logger.defaultLogError("Refesh failed.", t);
321         }
322     }
323
324     static final class TaskHelper {
325         private final String name;
326         private Object result;
327         final Semaphore sema = new Semaphore(0);
328         private Throwable throwable = null;
329         final Consumer<DatabaseException> callback = e -> {
330             synchronized (TaskHelper.this) {
331                 throwable = e;
332             }
333         };
334         final Procedure<Object> proc = new Procedure<Object>() {
335             @Override
336             public void execute(Object result) {
337                 callback.accept(null);
338             }
339             @Override
340             public void exception(Throwable t) {
341                 if (t instanceof DatabaseException)
342                     callback.accept((DatabaseException)t);
343                 else
344                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
345             }
346         };
347         final WriteTraits writeTraits = new WriteTraits() {};
348         TaskHelper(String name) {
349             this.name = name;
350         }
351         @SuppressWarnings("unchecked")
352         <T> T getResult() {
353             return (T)result;
354         }
355         void setResult(Object result) {
356             this.result = result;
357         }
358 //        Throwable throwableGet() {
359 //            return throwable;
360 //        }
361         synchronized void throwableSet(Throwable t) {
362             throwable = t;
363         }
364 //        void throwableSet(String t) {
365 //            throwable = new InternalException("" + name + " operation failed. " + t);
366 //        }
367         synchronized void throwableCheck()
368         throws DatabaseException {
369             if (null != throwable)
370                 if (throwable instanceof DatabaseException)
371                     throw (DatabaseException)throwable;
372                 else
373                     throw new DatabaseException("Undo operation failed.", throwable);
374         }
375         void throw_(String message)
376         throws DatabaseException {
377             throw new DatabaseException("" + name + " operation failed. " + message);
378         }
379     }
380
381
382
383     private ListenerBase getListenerBase(Object procedure) {
384         if (procedure instanceof ListenerBase)
385             return (ListenerBase) procedure;
386         else
387             return null;
388     }
389
390     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
391         scheduleRequest(request, callback, notify, null);
392     }
393
394     /* (non-Javadoc)
395      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
396      */
397     @Override
398     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
399
400         assert (request != null);
401
402         if(Development.DEVELOPMENT) {
403             try {
404             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
405                 System.err.println("schedule write '" + request + "'");
406             } catch (Throwable t) {
407                 Logger.defaultLogError(t);
408             }
409         }
410
411         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
412
413         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
414
415             @Override
416             public void run(int thread) {
417
418                 if(Development.DEVELOPMENT) {
419                     try {
420                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
421                         System.err.println("perform write '" + request + "'");
422                     } catch (Throwable t) {
423                         Logger.defaultLogError(t);
424                     }
425                 }
426
427                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
428
429                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
430
431                 try {
432
433                     flushCounter = 0;
434                     Disposable.safeDispose(clientChanges);
435                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
436
437                     VirtualGraph vg = getProvider(request.getProvider());
438
439                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
440                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
441
442                         @Override
443                         public void execute(Object result) {
444                             if(callback != null) callback.accept(null);
445                         }
446
447                         @Override
448                         public void exception(Throwable t) {
449                             if(callback != null) callback.accept((DatabaseException)t);
450                         }
451
452                     });
453
454                     assert (null != writer);
455 //                    writer.state.barrier.inc();
456
457                     try {
458                         request.perform(writer);
459                         assert (null != writer);
460                     } catch (Throwable t) {
461                         if (!(t instanceof CancelTransactionException))
462                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
463                         writeState.except(t);
464                     } finally {
465 //                        writer.state.barrier.dec();
466 //                        writer.waitAsync(request);
467                     }
468
469
470                     assert(!queryProvider2.cache.dirty);
471
472                 } catch (Throwable e) {
473
474                     // Log it first, just to be safe that the error is always logged.
475                     if (!(e instanceof CancelTransactionException))
476                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
477
478 //                    writeState.getGraph().state.barrier.dec();
479 //                    writeState.getGraph().waitAsync(request);
480
481                     writeState.except(e);
482
483 //                    try {
484 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
485 //                        // All we can do here is to log those, can't really pass them anywhere.
486 //                        if (callback != null) {
487 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
488 //                            else callback.run(new DatabaseException(e));
489 //                        }
490 //                    } catch (Throwable e2) {
491 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
492 //                    }
493 //                    boolean empty = clusterStream.reallyFlush();
494 //                    int callerThread = -1;
495 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
496 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
497 //                    try {
498 //                        // Send all local changes to server so it can calculate correct reverse change set.
499 //                        // This will call clusterStream.accept().
500 //                        state.cancelCommit(context, clusterStream);
501 //                        if (!empty) {
502 //                            if (!context.isOk()) // this is a blocking operation
503 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
504 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
505 //                        }
506 //                        state.cancelCommit2(context, clusterStream);
507 //                    } catch (DatabaseException e3) {
508 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
509 //                    } finally {
510 //                        clusterStream.setOff(false);
511 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
512 //                    }
513
514                 } finally {
515
516                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
517
518                 }
519
520                 task.finish();
521
522                 if(Development.DEVELOPMENT) {
523                     try {
524                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
525                         System.err.println("finish write '" + request + "'");
526                     } catch (Throwable t) {
527                         Logger.defaultLogError(t);
528                     }
529                 }
530
531             }
532
533         }, combine);
534
535     }
536
537     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
538         scheduleRequest(request, procedure, notify, null);
539     }
540
541     /* (non-Javadoc)
542      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
543      */
544     @Override
545     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
546
547         assert (request != null);
548
549         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
550
551         requestManager.scheduleWrite(new SessionTask(request, thread) {
552
553             @Override
554             public void run(int thread) {
555
556                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
557
558                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
559
560                 flushCounter = 0;
561                 Disposable.safeDispose(clientChanges);
562                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
563
564                 VirtualGraph vg = getProvider(request.getProvider());
565                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
566
567                 try {
568                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
569                     writeState = writeStateT;
570
571                     assert (null != writer);
572 //                    writer.state.barrier.inc();
573                     writeStateT.setResult(request.perform(writer));
574                     assert (null != writer);
575
576 //                    writer.state.barrier.dec();
577 //                    writer.waitAsync(null);
578
579                 } catch (Throwable e) {
580
581 //                    writer.state.barrier.dec();
582 //                    writer.waitAsync(null);
583
584                     writeState.except(e);
585
586 //                  state.stopWriteTransaction(clusterStream);
587 //
588 //              } catch (Throwable e) {
589 //                  // Log it first, just to be safe that the error is always logged.
590 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
591 //
592 //                  try {
593 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
594 //                      // All we can do here is to log those, can't really pass them anywhere.
595 //                      if (procedure != null) {
596 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
597 //                          else procedure.exception(new DatabaseException(e));
598 //                      }
599 //                  } catch (Throwable e2) {
600 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
601 //                  }
602 //
603 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
604 //
605 //                  state.stopWriteTransaction(clusterStream);
606
607                 } finally {
608                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
609                 }
610
611 //              if(notify != null) notify.release();
612
613                 task.finish();
614
615             }
616
617         }, combine);
618
619     }
620
621     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
622         scheduleRequest(request, callback, notify, null);
623     }
624
625     /* (non-Javadoc)
626      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
627      */
628     @Override
629     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
630
631         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
632
633         assert (request != null);
634
635         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
636
637         requestManager.scheduleWrite(new SessionTask(request, thread) {
638
639             @Override
640             public void run(int thread) {
641                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
642
643                 Procedure<Object> stateProcedure = new Procedure<Object>() {
644                     @Override
645                     public void execute(Object result) {
646                         if (callback != null)
647                             callback.accept(null);
648                     }
649                     @Override
650                     public void exception(Throwable t) {
651                         if (callback != null) {
652                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
653                             else callback.accept(new DatabaseException(t));
654                         } else
655                             Logger.defaultLogError("Unhandled exception", t);
656                     }
657                 };
658
659                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
660                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
661                 DelayedWriteGraph dwg = null;
662 //                newGraph.state.barrier.inc();
663
664                 try {
665                     dwg = new DelayedWriteGraph(newGraph);
666                     request.perform(dwg);
667                 } catch (Throwable e) {
668                     delayedWriteState.except(e);
669                     total.finish();
670                     dwg.close();
671                     return;
672                 } finally {
673 //                    newGraph.state.barrier.dec();
674 //                    newGraph.waitAsync(request);
675                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
676                 }
677
678                 delayedWriteState = null;
679
680                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
681                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
682
683                 flushCounter = 0;
684                 Disposable.safeDispose(clientChanges);
685                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
686
687                 acquireWriteOnly();
688
689                 VirtualGraph vg = getProvider(request.getProvider());
690                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
691
692                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
693
694                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
695
696                 assert (null != writer);
697 //                writer.state.barrier.inc();
698
699                 try {
700                     // Cannot cancel
701                     dwg.commit(writer, request);
702
703                         if(defaultClusterSet != null) {
704                                 XSupport xs = getService(XSupport.class);
705                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
706                                 ClusteringSupport cs = getService(ClusteringSupport.class);
707                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
708                         }
709
710                     // This makes clusters available from server
711                     clusterStream.reallyFlush();
712
713                     releaseWriteOnly(writer);
714                     handleUpdatesAndMetadata(writer);
715
716                 } catch (ServiceException e) {
717 //                    writer.state.barrier.dec();
718 //                    writer.waitAsync(null);
719
720                     // These shall be requested from server
721                     clusterTable.removeWriteOnlyClusters();
722                     // This makes clusters available from server
723                     clusterStream.reallyFlush();
724
725                     releaseWriteOnly(writer);
726                     writeState.except(e);
727                 } finally {
728                     // Debugging & Profiling
729                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
730                     task2.finish();
731                     total.finish();
732                 }
733             }
734
735         }, combine);
736
737     }
738
739     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
740         scheduleRequest(request, procedure, notify, null);
741     }
742
743     /* (non-Javadoc)
744      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
745      */
746     @Override
747     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
748         throw new Error("Not implemented");
749     }
750
751     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
752         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
753         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
754             createdClusters.add(cluster.clusterId);
755         }
756         return cluster;
757     }
758
759     class WriteOnlySupport implements WriteSupport {
760
761         ClusterStream stream;
762         ClusterImpl currentCluster;
763
764         public WriteOnlySupport() {
765             this.stream = clusterStream;
766         }
767
768         @Override
769         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
770             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
771         }
772
773         @Override
774         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
775
776             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
777
778             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
779                 if(s != queryProvider2.getRootLibrary())
780                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
781
782             try {
783                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
784             } catch (DatabaseException e) {
785                 Logger.defaultLogError(e);
786                 //throw e;
787             }
788
789             clientChanges.invalidate(s);
790
791             if (cluster.isWriteOnly())
792                 return;
793             queryProvider2.updateStatements(s, p);
794
795         }
796
797         @Override
798         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
799             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
800         }
801
802         @Override
803         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
804
805             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
806             try {
807                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
808             } catch (DatabaseException e) {
809                 Logger.defaultLogError(e);
810             }
811
812             clientChanges.invalidate(rid);
813
814             if (cluster.isWriteOnly())
815                 return;
816             queryProvider2.updateValue(rid);
817
818         }
819
820         @Override
821         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
822
823             if(amount < 65536) {
824                 claimValue(provider,resource, reader.readBytes(null, amount));
825                 return;
826             }
827
828             byte[] bytes = new byte[65536];
829
830             int rid = ((ResourceImpl)resource).id;
831             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
832             try {
833                 int left = amount;
834                 while(left > 0) {
835                     int block = Math.min(left, 65536);
836                     reader.readBytes(bytes, block);
837                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
838                     left -= block;
839                 }
840             } catch (DatabaseException e) {
841                 Logger.defaultLogError(e);
842             }
843
844             clientChanges.invalidate(rid);
845
846             if (cluster.isWriteOnly())
847                 return;
848             queryProvider2.updateValue(rid);
849
850         }
851
852         private void maintainCluster(ClusterImpl before, ClusterI after_) {
853             if(after_ != null && after_ != before) {
854                 ClusterImpl after = (ClusterImpl)after_;
855                 if(currentCluster == before) {
856                     currentCluster = after;
857                 }
858                 clusterTable.replaceCluster(after);
859             }
860         }
861
862         public int createResourceKey(int foreignCounter) throws DatabaseException {
863             if(currentCluster == null) {
864                 currentCluster = getNewResourceCluster();
865             }
866             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
867                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
868                 newCluster.foreignLookup = new byte[foreignCounter];
869                 currentCluster = newCluster;
870                 if (DEBUG)
871                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
872             }
873             return currentCluster.createResource(clusterTranslator);
874         }
875
876         @Override
877         public Resource createResource(VirtualGraph provider) throws DatabaseException {
878             if(currentCluster == null) {
879                 if (null != defaultClusterSet) {
880                         ResourceImpl result = getNewResource(defaultClusterSet);
881                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
882                     return result;
883                 } else {
884                     currentCluster = getNewResourceCluster();
885                 }
886             }
887             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
888                 if (null != defaultClusterSet) {
889                         ResourceImpl result = getNewResource(defaultClusterSet);
890                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
891                     return result;
892                 } else {
893                     currentCluster = getNewResourceCluster();
894                 }
895             }
896             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
897         }
898
899         @Override
900         public Resource createResource(VirtualGraph provider, long clusterId)
901         throws DatabaseException {
902             return getNewResource(clusterId);
903         }
904
905         @Override
906         public Resource createResource(VirtualGraph provider, Resource clusterSet)
907         throws DatabaseException {
908             return getNewResource(clusterSet);
909         }
910
911         @Override
912         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
913                 throws DatabaseException {
914             getNewClusterSet(clusterSet);
915         }
916
917         @Override
918         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
919         throws ServiceException {
920             return containsClusterSet(clusterSet);
921         }
922
923         public void selectCluster(long cluster) {
924                 currentCluster = clusterTable.getClusterByClusterId(cluster);
925                 long setResourceId = clusterSetsSupport.getSet(cluster);
926                 clusterSetsSupport.put(setResourceId, cluster);
927         }
928
929         @Override
930         public Resource setDefaultClusterSet(Resource clusterSet)
931         throws ServiceException {
932                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
933                 if(clusterSet != null) {
934                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
935                         currentCluster = clusterTable.getClusterByClusterId(id);
936                         return result;
937                 } else {
938                         currentCluster = null;
939                         return null;
940                 }
941         }
942
943         @Override
944         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
945
946                  provider = getProvider(provider);
947              if (null == provider) {
948                  int key = ((ResourceImpl)resource).id;
949
950
951
952                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
953 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
954 //                      if(key != queryProvider2.getRootLibrary())
955 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
956
957 //                 try {
958 //                     cluster.removeValue(key, clusterTranslator);
959 //                 } catch (DatabaseException e) {
960 //                     Logger.defaultLogError(e);
961 //                     return;
962 //                 }
963
964                  clusterTable.writeOnlyInvalidate(cluster);
965
966                  try {
967                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
968                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
969                          clusterTranslator.removeValue(cluster);
970                  } catch (DatabaseException e) {
971                          Logger.defaultLogError(e);
972                  }
973
974                  queryProvider2.invalidateResource(key);
975                  clientChanges.invalidate(key);
976
977              } else {
978                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
979                  queryProvider2.updateValue(querySupport.getId(resource));
980                  clientChanges.claimValue(resource);
981              }
982
983
984         }
985
986         @Override
987         public void flush(boolean intermediate) {
988             throw new UnsupportedOperationException();
989         }
990
991         @Override
992         public void flushCluster() {
993             clusterTable.flushCluster(graphSession);
994             if(defaultClusterSet != null) {
995                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
996             }
997             currentCluster = null;
998         }
999
1000         @Override
1001         public void flushCluster(Resource r) {
1002             throw new UnsupportedOperationException("flushCluster resource " + r);
1003         }
1004
1005         @Override
1006         public void gc() {
1007         }
1008
1009         @Override
1010         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1011                 Resource object) {
1012
1013                 int s = ((ResourceImpl)subject).id;
1014                 int p = ((ResourceImpl)predicate).id;
1015                 int o = ((ResourceImpl)object).id;
1016
1017                 provider = getProvider(provider);
1018                 if (null == provider) {
1019
1020                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1021                         clusterTable.writeOnlyInvalidate(cluster);
1022
1023                         try {
1024
1025                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1026                                 int pResourceKey = ClusterTraits.getResourceIndexFromResourceKey(p);
1027                                 int oResourceKey = ClusterTraits.getResourceIndexFromResourceKey(o);
1028
1029                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1030                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1031
1032                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1033                                 clusterTranslator.addStatementIndex(cluster, pResourceKey, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1034                                 clusterTranslator.addStatementIndex(cluster, oResourceKey, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1035                                 clusterTranslator.removeStatement(cluster);
1036
1037                                 queryProvider2.invalidateResource(s);
1038                                 clientChanges.invalidate(s);
1039
1040                         } catch (DatabaseException e) {
1041
1042                                 Logger.defaultLogError(e);
1043
1044                         }
1045
1046                         return true;
1047
1048                 } else {
1049                         
1050                         ((VirtualGraphImpl)provider).deny(s, p, o);
1051                         queryProvider2.invalidateResource(s);
1052                         clientChanges.invalidate(s);
1053
1054                         return true;
1055
1056                 }
1057
1058         }
1059
1060         @Override
1061         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1062             throw new UnsupportedOperationException();
1063         }
1064
1065         @Override
1066         public boolean writeOnly() {
1067             return true;
1068         }
1069
1070         @Override
1071         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1072             writeSupport.performWriteRequest(graph, request);
1073         }
1074
1075         @Override
1076         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1077             throw new UnsupportedOperationException();
1078         }
1079
1080         @Override
1081         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1082             throw new UnsupportedOperationException();
1083         }
1084
1085         @Override
1086         public <T> void addMetadata(Metadata data) throws ServiceException {
1087             writeSupport.addMetadata(data);
1088         }
1089
1090         @Override
1091         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1092             return writeSupport.getMetadata(clazz);
1093         }
1094
1095         @Override
1096         public TreeMap<String, byte[]> getMetadata() {
1097             return writeSupport.getMetadata();
1098         }
1099
1100         @Override
1101         public void commitDone(WriteTraits writeTraits, long csid) {
1102             writeSupport.commitDone(writeTraits, csid);
1103         }
1104
1105         @Override
1106         public void clearUndoList(WriteTraits writeTraits) {
1107             writeSupport.clearUndoList(writeTraits);
1108         }
1109         @Override
1110         public int clearMetadata() {
1111             return writeSupport.clearMetadata();
1112         }
1113
1114                 @Override
1115                 public void startUndo() {
1116                         writeSupport.startUndo();
1117                 }
1118     }
1119
1120     class VirtualWriteOnlySupport implements WriteSupport {
1121
1122 //        @Override
1123 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1124 //                Resource object) {
1125 //            throw new UnsupportedOperationException();
1126 //        }
1127
1128         @Override
1129         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1130
1131             TransientGraph impl = (TransientGraph)provider;
1132             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1133             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1134             clientChanges.claim(subject, predicate, object);
1135
1136         }
1137
1138         @Override
1139         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1140
1141             TransientGraph impl = (TransientGraph)provider;
1142             impl.claim(subject, predicate, object);
1143             getQueryProvider2().updateStatements(subject, predicate);
1144             clientChanges.claim(subject, predicate, object);
1145
1146         }
1147
1148         @Override
1149         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1150             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1151         }
1152
1153         @Override
1154         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1155             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1156             getQueryProvider2().updateValue(resource);
1157             clientChanges.claimValue(resource);
1158         }
1159
1160         @Override
1161         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1162             byte[] value = reader.readBytes(null, amount);
1163             claimValue(provider, resource, value);
1164         }
1165
1166         @Override
1167         public Resource createResource(VirtualGraph provider) {
1168             TransientGraph impl = (TransientGraph)provider;
1169             return impl.getResource(impl.newResource(false));
1170         }
1171
1172         @Override
1173         public Resource createResource(VirtualGraph provider, long clusterId) {
1174             throw new UnsupportedOperationException();
1175         }
1176
1177         @Override
1178         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1179         throws DatabaseException {
1180             throw new UnsupportedOperationException();
1181         }
1182
1183         @Override
1184         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1185         throws DatabaseException {
1186             throw new UnsupportedOperationException();
1187         }
1188
1189         @Override
1190         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1191         throws ServiceException {
1192             throw new UnsupportedOperationException();
1193         }
1194
1195         @Override
1196         public Resource setDefaultClusterSet(Resource clusterSet)
1197         throws ServiceException {
1198                 return null;
1199         }
1200
1201         @Override
1202         public void denyValue(VirtualGraph provider, Resource resource) {
1203             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1204             getQueryProvider2().updateValue(querySupport.getId(resource));
1205             // NOTE: this only keeps track of value changes by-resource.
1206             clientChanges.claimValue(resource);
1207         }
1208
1209         @Override
1210         public void flush(boolean intermediate) {
1211             throw new UnsupportedOperationException();
1212         }
1213
1214         @Override
1215         public void flushCluster() {
1216             throw new UnsupportedOperationException();
1217         }
1218
1219         @Override
1220         public void flushCluster(Resource r) {
1221             throw new UnsupportedOperationException("Resource " + r);
1222         }
1223
1224         @Override
1225         public void gc() {
1226         }
1227
1228         @Override
1229         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1230             TransientGraph impl = (TransientGraph) provider;
1231             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1232             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1233             clientChanges.deny(subject, predicate, object);
1234             return true;
1235         }
1236
1237         @Override
1238         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1239             throw new UnsupportedOperationException();
1240         }
1241
1242         @Override
1243         public boolean writeOnly() {
1244             return true;
1245         }
1246
1247         @Override
1248         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1249             throw new UnsupportedOperationException();
1250         }
1251
1252         @Override
1253         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1254             throw new UnsupportedOperationException();
1255         }
1256
1257         @Override
1258         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1259             throw new UnsupportedOperationException();
1260         }
1261
1262         @Override
1263         public <T> void addMetadata(Metadata data) throws ServiceException {
1264             throw new UnsupportedOperationException();
1265         }
1266
1267         @Override
1268         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1269             throw new UnsupportedOperationException();
1270         }
1271
1272         @Override
1273         public TreeMap<String, byte[]> getMetadata() {
1274             throw new UnsupportedOperationException();
1275         }
1276
1277         @Override
1278         public void commitDone(WriteTraits writeTraits, long csid) {
1279         }
1280
1281         @Override
1282         public void clearUndoList(WriteTraits writeTraits) {
1283         }
1284
1285         @Override
1286         public int clearMetadata() {
1287             return 0;
1288         }
1289
1290                 @Override
1291                 public void startUndo() {
1292                 }
1293     }
1294
1295     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1296
1297         try {
1298
1299             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1300
1301             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1302
1303             flushCounter = 0;
1304             Disposable.safeDispose(clientChanges);
1305             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1306
1307             acquireWriteOnly();
1308
1309             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1310
1311             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1312
1313             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1314             writeState = writeStateT;
1315
1316             assert (null != writer);
1317 //            writer.state.barrier.inc();
1318             long start = System.nanoTime();
1319             T result = request.perform(writer);
1320             long duration = System.nanoTime() - start;
1321             if (DEBUG) {
1322                 System.err.println("################");
1323                 System.err.println("WriteOnly duration " + 1e-9*duration);
1324             }
1325             writeStateT.setResult(result);
1326
1327             // This makes clusters available from server
1328             clusterStream.reallyFlush();
1329
1330             // This will trigger query updates
1331             releaseWriteOnly(writer);
1332             assert (null != writer);
1333
1334             handleUpdatesAndMetadata(writer);
1335
1336         } catch (CancelTransactionException e) {
1337
1338             releaseWriteOnly(writeState.getGraph());
1339
1340             clusterTable.removeWriteOnlyClusters();
1341             state.stopWriteTransaction(clusterStream);
1342
1343         } catch (Throwable e) {
1344
1345             e.printStackTrace();
1346
1347             releaseWriteOnly(writeState.getGraph());
1348
1349             clusterTable.removeWriteOnlyClusters();
1350
1351             if (callback != null)
1352                 callback.exception(new DatabaseException(e));
1353
1354             state.stopWriteTransaction(clusterStream);
1355             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1356
1357         } finally  {
1358
1359             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1360
1361         }
1362
1363     }
1364
1365     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1366         scheduleRequest(request, callback, notify, null);
1367     }
1368
1369     @Override
1370     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1371
1372         assertAlive();
1373
1374         assert (request != null);
1375
1376         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1377
1378         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1379
1380             @Override
1381             public void run(int thread) {
1382
1383                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1384
1385                     try {
1386
1387                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1388
1389                         flushCounter = 0;
1390                         Disposable.safeDispose(clientChanges);
1391                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1392
1393                         acquireWriteOnly();
1394
1395                         VirtualGraph vg = getProvider(request.getProvider());
1396                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1397
1398                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1399
1400                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1401
1402                             @Override
1403                             public void execute(Object result) {
1404                                 if(callback != null) callback.accept(null);
1405                             }
1406
1407                             @Override
1408                             public void exception(Throwable t) {
1409                                 if(callback != null) callback.accept((DatabaseException)t);
1410                             }
1411
1412                         });
1413
1414                         assert (null != writer);
1415 //                        writer.state.barrier.inc();
1416
1417                         try {
1418
1419                             request.perform(writer);
1420
1421                         } catch (Throwable e) {
1422
1423 //                            writer.state.barrier.dec();
1424 //                            writer.waitAsync(null);
1425
1426                             releaseWriteOnly(writer);
1427
1428                             clusterTable.removeWriteOnlyClusters();
1429
1430                             if(!(e instanceof CancelTransactionException)) {
1431                                 if (callback != null)
1432                                     callback.accept(new DatabaseException(e));
1433                             }
1434
1435                             writeState.except(e);
1436
1437                             return;
1438
1439                         }
1440
1441                         // This makes clusters available from server
1442                         boolean empty = clusterStream.reallyFlush();
1443                         // This was needed to make WO requests call metadata listeners.
1444                         // NOTE: the calling event does not contain clientChanges information.
1445                         if (!empty && clientChanges.isEmpty())
1446                             clientChanges.setNotEmpty(true);
1447                         releaseWriteOnly(writer);
1448                         assert (null != writer);
1449                     } finally  {
1450
1451                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1452
1453                     }
1454
1455
1456                 task.finish();
1457
1458             }
1459
1460         }, combine);
1461
1462     }
1463
1464     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1465         scheduleRequest(request, callback, notify, null);
1466     }
1467
1468     /* (non-Javadoc)
1469      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1470      */
1471     @Override
1472     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1473
1474         assert (request != null);
1475
1476         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1477
1478         requestManager.scheduleWrite(new SessionTask(request, thread) {
1479
1480             @Override
1481             public void run(int thread) {
1482
1483                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1484
1485                 performWriteOnly(request, notify, callback);
1486
1487                 task.finish();
1488
1489             }
1490
1491         }, combine);
1492
1493     }
1494
1495     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1496
1497         assert (request != null);
1498         assert (procedure != null);
1499
1500         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1501
1502         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1503
1504             @Override
1505             public void run(int thread) {
1506
1507                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1508
1509                 ListenerBase listener = getListenerBase(procedure);
1510
1511                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1512
1513                 try {
1514
1515                     if (listener != null) {
1516
1517                         try {
1518                                 
1519                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1520
1521                                         @Override
1522                                         public void exception(AsyncReadGraph graph, Throwable t) {
1523                                                 procedure.exception(graph, t);
1524                                                 if(throwable != null) {
1525                                                         throwable.set(t);
1526                                                 } else {
1527                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1528                                                 }
1529                                         }
1530
1531                                         @Override
1532                                         public void execute(AsyncReadGraph graph, T t) {
1533                                                 if(result != null) result.set(t);
1534                                                 procedure.execute(graph, t);
1535                                         }
1536
1537                                 };
1538                                 
1539                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap);
1540                                 
1541                         } catch (Throwable t) {
1542                             // This is handled by the AsyncProcedure
1543                                 //Logger.defaultLogError("Internal error", t);
1544                         }
1545
1546                     } else {
1547
1548                         try {
1549
1550 //                            newGraph.state.barrier.inc();
1551
1552                             T t = request.perform(newGraph);
1553
1554                             try {
1555
1556                                 if(result != null) result.set(t);
1557                                 procedure.execute(newGraph, t);
1558
1559                             } catch (Throwable th) {
1560
1561                                 if(throwable != null) {
1562                                     throwable.set(th);
1563                                 } else {
1564                                     Logger.defaultLogError("Unhandled exception", th);
1565                                 }
1566
1567                             }
1568
1569                         } catch (Throwable t) {
1570
1571                             if (DEBUG)
1572                                 t.printStackTrace();
1573
1574                             if(throwable != null) {
1575                                 throwable.set(t);
1576                             } else {
1577                                 Logger.defaultLogError("Unhandled exception", t);
1578                             }
1579
1580                             try {
1581
1582                                 procedure.exception(newGraph, t);
1583
1584                             } catch (Throwable t2) {
1585
1586                                 if(throwable != null) {
1587                                     throwable.set(t2);
1588                                 } else {
1589                                     Logger.defaultLogError("Unhandled exception", t2);
1590                                 }
1591
1592                             }
1593
1594                         }
1595
1596 //                        newGraph.state.barrier.dec();
1597 //                        newGraph.waitAsync(request);
1598
1599                     }
1600
1601                 } finally {
1602
1603                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1604
1605                 }
1606
1607             }
1608
1609         });
1610
1611     }
1612
1613     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1614
1615         assert (request != null);
1616         assert (procedure != null);
1617
1618         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1619
1620         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1621
1622             @Override
1623             public void run(int thread) {
1624
1625                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1626
1627                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1628
1629                 try {
1630
1631                     if (listener != null) {
1632
1633                         try {
1634                                 QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure);
1635                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1636                                                 } catch (DatabaseException e) {
1637                                                         Logger.defaultLogError(e);
1638                                                 }
1639
1640                     } else {
1641
1642                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1643                                 procedure, "request");
1644
1645                         try {
1646
1647 //                            newGraph.state.barrier.inc();
1648
1649                             request.perform(newGraph, wrapper);
1650
1651 //                            newGraph.waitAsync(request);
1652
1653                         } catch (Throwable t) {
1654
1655                             wrapper.exception(newGraph, t);
1656 //                            newGraph.waitAsync(request);
1657
1658
1659                         }
1660
1661                     }
1662
1663                 } finally {
1664
1665                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1666
1667                 }
1668
1669             }
1670
1671         });
1672
1673     }
1674
1675     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1676
1677         assert (request != null);
1678         assert (procedure != null);
1679
1680         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1681
1682         int sync = notify != null ? thread : -1;
1683
1684         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1685
1686             @Override
1687             public void run(int thread) {
1688
1689                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1690
1691                 ListenerBase listener = getListenerBase(procedure);
1692
1693                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1694
1695                 try {
1696
1697                     if (listener != null) {
1698
1699                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1700
1701 //                        newGraph.waitAsync(request);
1702
1703                     } else {
1704
1705                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1706
1707                         try {
1708
1709                             request.perform(newGraph, wrapper);
1710
1711                         } catch (Throwable t) {
1712
1713                             t.printStackTrace();
1714
1715                         }
1716
1717                     }
1718
1719                 } finally {
1720
1721                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1722
1723                 }
1724
1725             }
1726
1727         });
1728
1729     }
1730
1731     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1732
1733         assert (request != null);
1734         assert (procedure != null);
1735
1736         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1737
1738         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1739
1740             @Override
1741             public void run(int thread) {
1742
1743                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1744
1745                 ListenerBase listener = getListenerBase(procedure);
1746
1747                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1748
1749                 try {
1750
1751                     if (listener != null) {
1752
1753                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1754
1755                             @Override
1756                             public void exception(Throwable t) {
1757                                 procedure.exception(t);
1758                                 if(throwable != null) {
1759                                     throwable.set(t);
1760                                 }
1761                             }
1762
1763                             @Override
1764                             public void execute(T t) {
1765                                 if(result != null) result.set(t);
1766                                 procedure.execute(t);
1767                             }
1768
1769                         }, listener);
1770
1771 //                        newGraph.waitAsync(request);
1772
1773                     } else {
1774
1775 //                        newGraph.state.barrier.inc();
1776
1777                         request.register(newGraph, new Listener<T>() {
1778
1779                             @Override
1780                             public void exception(Throwable t) {
1781                                 if(throwable != null) throwable.set(t);
1782                                 procedure.exception(t);
1783 //                                newGraph.state.barrier.dec();
1784                             }
1785
1786                             @Override
1787                             public void execute(T t) {
1788                                 if(result != null) result.set(t);
1789                                 procedure.execute(t);
1790 //                                newGraph.state.barrier.dec();
1791                             }
1792
1793                             @Override
1794                             public boolean isDisposed() {
1795                                 return true;
1796                             }
1797
1798                         });
1799
1800 //                        newGraph.waitAsync(request);
1801
1802                     }
1803
1804                 } finally {
1805
1806                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1807
1808                 }
1809
1810             }
1811
1812         });
1813
1814     }
1815
1816
1817     @Override
1818     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1819
1820         scheduleRequest(request, procedure, null, null, null);
1821
1822     }
1823
1824     @Override
1825     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1826         assertNotSession();
1827         Semaphore notify = new Semaphore(0);
1828         DataContainer<Throwable> container = new DataContainer<Throwable>();
1829         DataContainer<T> result = new DataContainer<T>();
1830         scheduleRequest(request, procedure, notify, container, result);
1831         acquire(notify, request);
1832         Throwable throwable = container.get();
1833         if(throwable != null) {
1834             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1835             else throw new DatabaseException("Unexpected exception", throwable);
1836         }
1837         return result.get();
1838     }
1839
1840     @Override
1841     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1842         assertNotSession();
1843         Semaphore notify = new Semaphore(0);
1844         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1845         final DataContainer<T> resultContainer = new DataContainer<T>();
1846         scheduleRequest(request, new AsyncProcedure<T>() {
1847             @Override
1848             public void exception(AsyncReadGraph graph, Throwable throwable) {
1849                 exceptionContainer.set(throwable);
1850                 procedure.exception(graph, throwable);
1851             }
1852             @Override
1853             public void execute(AsyncReadGraph graph, T result) {
1854                 resultContainer.set(result);
1855                 procedure.execute(graph, result);
1856             }
1857         }, getListenerBase(procedure), notify);
1858         acquire(notify, request, procedure);
1859         Throwable throwable = exceptionContainer.get();
1860         if (throwable != null) {
1861             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1862             else throw new DatabaseException("Unexpected exception", throwable);
1863         }
1864         return resultContainer.get();
1865     }
1866
1867
1868     @Override
1869     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1870         assertNotSession();
1871         Semaphore notify = new Semaphore(0);
1872         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1873         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1874             @Override
1875             public void exception(AsyncReadGraph graph, Throwable throwable) {
1876                 exceptionContainer.set(throwable);
1877                 procedure.exception(graph, throwable);
1878             }
1879             @Override
1880             public void execute(AsyncReadGraph graph, T result) {
1881                 procedure.execute(graph, result);
1882             }
1883             @Override
1884             public void finished(AsyncReadGraph graph) {
1885                 procedure.finished(graph);
1886             }
1887         }, notify);
1888         acquire(notify, request, procedure);
1889         Throwable throwable = exceptionContainer.get();
1890         if (throwable != null) {
1891             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1892             else throw new DatabaseException("Unexpected exception", throwable);
1893         }
1894         // TODO: implement return value
1895         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1896         return null;
1897     }
1898
1899     @Override
1900     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1901         return syncRequest(request, new ProcedureAdapter<T>());
1902
1903
1904 //        assert(request != null);
1905 //
1906 //        final DataContainer<T> result = new DataContainer<T>();
1907 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1908 //
1909 //        syncRequest(request, new Procedure<T>() {
1910 //
1911 //            @Override
1912 //            public void execute(T t) {
1913 //                result.set(t);
1914 //            }
1915 //
1916 //            @Override
1917 //            public void exception(Throwable t) {
1918 //                exception.set(t);
1919 //            }
1920 //
1921 //        });
1922 //
1923 //        Throwable t = exception.get();
1924 //        if(t != null) {
1925 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1926 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1927 //        }
1928 //
1929 //        return result.get();
1930
1931     }
1932
1933     @Override
1934     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1935         return syncRequest(request, (Procedure<T>)procedure);
1936     }
1937
1938
1939 //    @Override
1940 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1941 //        assertNotSession();
1942 //        Semaphore notify = new Semaphore(0);
1943 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1944 //        DataContainer<T> result = new DataContainer<T>();
1945 //        scheduleRequest(request, procedure, notify, container, result);
1946 //        acquire(notify, request);
1947 //        Throwable throwable = container.get();
1948 //        if(throwable != null) {
1949 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1950 //            else throw new DatabaseException("Unexpected exception", throwable);
1951 //        }
1952 //        return result.get();
1953 //    }
1954
1955
1956     @Override
1957     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1958         assertNotSession();
1959         Semaphore notify = new Semaphore(0);
1960         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1961         final DataContainer<T> result = new DataContainer<T>();
1962         scheduleRequest(request, procedure, notify, container, result);
1963         acquire(notify, request);
1964         Throwable throwable = container.get();
1965         if (throwable != null) {
1966             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1967             else throw new DatabaseException("Unexpected exception", throwable);
1968         }
1969         return result.get();
1970     }
1971
1972     @Override
1973     public void syncRequest(Write request) throws DatabaseException  {
1974         assertNotSession();
1975         assertAlive();
1976         Semaphore notify = new Semaphore(0);
1977         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1978         scheduleRequest(request, e -> exception.set(e), notify);
1979         acquire(notify, request);
1980         if(exception.get() != null) throw exception.get();
1981     }
1982
1983     @Override
1984     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1985         assertNotSession();
1986         Semaphore notify = new Semaphore(0);
1987         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1988         final DataContainer<T> result = new DataContainer<T>();
1989         scheduleRequest(request, new Procedure<T>() {
1990
1991             @Override
1992             public void exception(Throwable t) {
1993               exception.set(t);
1994             }
1995
1996             @Override
1997             public void execute(T t) {
1998               result.set(t);
1999             }
2000
2001         }, notify);
2002         acquire(notify, request);
2003         if(exception.get() != null) {
2004             Throwable t = exception.get();
2005             if(t instanceof DatabaseException) throw (DatabaseException)t;
2006             else throw new DatabaseException(t);
2007         }
2008         return result.get();
2009     }
2010
2011     @Override
2012     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2013         assertNotSession();
2014         Semaphore notify = new Semaphore(0);
2015         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2016         final DataContainer<T> result = new DataContainer<T>();
2017         scheduleRequest(request, new Procedure<T>() {
2018
2019             @Override
2020             public void exception(Throwable t) {
2021               exception.set(t);
2022             }
2023
2024             @Override
2025             public void execute(T t) {
2026               result.set(t);
2027             }
2028
2029         }, notify);
2030         acquire(notify, request);
2031         if(exception.get() != null) {
2032             Throwable t = exception.get();
2033             if(t instanceof DatabaseException) throw (DatabaseException)t;
2034             else throw new DatabaseException(t);
2035         }
2036         return result.get();
2037     }
2038
2039     @Override
2040     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2041         assertNotSession();
2042         Semaphore notify = new Semaphore(0);
2043         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2044         final DataContainer<T> result = new DataContainer<T>();
2045         scheduleRequest(request, new Procedure<T>() {
2046
2047             @Override
2048             public void exception(Throwable t) {
2049               exception.set(t);
2050             }
2051
2052             @Override
2053             public void execute(T t) {
2054               result.set(t);
2055             }
2056
2057         }, notify);
2058         acquire(notify, request);
2059         if(exception.get() != null) {
2060             Throwable t = exception.get();
2061             if(t instanceof DatabaseException) throw (DatabaseException)t;
2062             else throw new DatabaseException(t);
2063         }
2064         return result.get();
2065     }
2066
2067     @Override
2068     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2069         assertNotSession();
2070         Semaphore notify = new Semaphore(0);
2071         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2072         scheduleRequest(request, e -> exception.set(e), notify);
2073         acquire(notify, request);
2074         if(exception.get() != null) throw exception.get();
2075     }
2076
2077     @Override
2078     public void syncRequest(WriteOnly request) throws DatabaseException  {
2079         assertNotSession();
2080         assertAlive();
2081         Semaphore notify = new Semaphore(0);
2082         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2083         scheduleRequest(request, e -> exception.set(e), notify);
2084         acquire(notify, request);
2085         if(exception.get() != null) throw exception.get();
2086     }
2087
2088     /*
2089      *
2090      * GraphRequestProcessor interface
2091      */
2092
2093     @Override
2094     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2095
2096         scheduleRequest(request, procedure, null, null);
2097
2098     }
2099
2100     @Override
2101     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2102
2103         scheduleRequest(request, procedure, null);
2104
2105     }
2106
2107     @Override
2108     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2109
2110         scheduleRequest(request, procedure, null, null, null);
2111
2112     }
2113
2114     @Override
2115     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2116
2117         scheduleRequest(request, callback, null);
2118
2119     }
2120
2121     @Override
2122     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2123
2124         scheduleRequest(request, procedure, null);
2125
2126     }
2127
2128     @Override
2129     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2130
2131         scheduleRequest(request, procedure, null);
2132
2133     }
2134
2135     @Override
2136     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2137
2138         scheduleRequest(request, procedure, null);
2139
2140     }
2141
2142     @Override
2143     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2144
2145         scheduleRequest(request, callback, null);
2146
2147     }
2148
2149     @Override
2150     public void asyncRequest(final Write r) {
2151         asyncRequest(r, null);
2152     }
2153
2154     @Override
2155     public void asyncRequest(final DelayedWrite r) {
2156         asyncRequest(r, null);
2157     }
2158
2159     @Override
2160     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2161
2162         scheduleRequest(request, callback, null);
2163
2164     }
2165
2166     @Override
2167     public void asyncRequest(final WriteOnly request) {
2168
2169         asyncRequest(request, null);
2170
2171     }
2172
2173     @Override
2174     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2175         r.request(this, procedure);
2176     }
2177
2178     @Override
2179     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2180         r.request(this, procedure);
2181     }
2182
2183     @Override
2184     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2185         r.request(this, procedure);
2186     }
2187
2188     @Override
2189     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2190         r.request(this, procedure);
2191     }
2192
2193     @Override
2194     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2195         r.request(this, procedure);
2196     }
2197
2198     @Override
2199     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2200         r.request(this, procedure);
2201     }
2202
2203     @Override
2204     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2205         return r.request(this);
2206     }
2207
2208     @Override
2209     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2210         return r.request(this);
2211     }
2212
2213     @Override
2214     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2215         r.request(this, procedure);
2216     }
2217
2218     @Override
2219     public <T> void async(WriteInterface<T> r) {
2220         r.request(this, new ProcedureAdapter<T>());
2221     }
2222
2223     //@Override
2224     public void incAsync() {
2225         state.incAsync();
2226     }
2227
2228     //@Override
2229     public void decAsync() {
2230         state.decAsync();
2231     }
2232
2233     public long getCluster(ResourceImpl resource) {
2234         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2235         return cluster.getClusterId();
2236     }
2237
2238     public long getCluster(int id) {
2239         if (clusterTable == null)
2240             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2241         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2242     }
2243
2244     public ResourceImpl getResource(int id) {
2245         return new ResourceImpl(resourceSupport, id);
2246     }
2247
2248     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2249         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2250         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2251         int key = proxy.getClusterKey();
2252         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2253         return new ResourceImpl(resourceSupport, resourceKey);
2254     }
2255
2256     public ResourceImpl getResource2(int id) {
2257         assert (id != 0);
2258         return new ResourceImpl(resourceSupport, id);
2259     }
2260
2261     final public int getId(ResourceImpl impl) {
2262         return impl.id;
2263     }
2264
2265     public static final Charset UTF8 = Charset.forName("utf-8");
2266
2267
2268
2269
2270     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2271         for(TransientGraph g : support.providers) {
2272             if(g.isPending(subject)) return false;
2273         }
2274         return true;
2275     }
2276
2277     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2278         for(TransientGraph g : support.providers) {
2279             if(g.isPending(subject, predicate)) return false;
2280         }
2281         return true;
2282     }
2283
2284     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2285
2286         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2287
2288             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2289
2290             @Override
2291             public void accept(ReadGraphImpl graph) {
2292                 if(ready.decrementAndGet() == 0) {
2293                     runnable.accept(graph);
2294                 }
2295             }
2296
2297         };
2298
2299         for(TransientGraph g : support.providers) {
2300             if(g.isPending(subject)) {
2301                 try {
2302                     g.load(graph, subject, composite);
2303                 } catch (DatabaseException e) {
2304                     e.printStackTrace();
2305                 }
2306             } else {
2307                 composite.accept(graph);
2308             }
2309         }
2310
2311         composite.accept(graph);
2312
2313     }
2314
2315     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2316
2317         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2318
2319             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2320
2321             @Override
2322             public void accept(ReadGraphImpl graph) {
2323                 if(ready.decrementAndGet() == 0) {
2324                     runnable.accept(graph);
2325                 }
2326             }
2327
2328         };
2329
2330         for(TransientGraph g : support.providers) {
2331             if(g.isPending(subject, predicate)) {
2332                 try {
2333                     g.load(graph, subject, predicate, composite);
2334                 } catch (DatabaseException e) {
2335                     e.printStackTrace();
2336                 }
2337             } else {
2338                 composite.accept(graph);
2339             }
2340         }
2341
2342         composite.accept(graph);
2343
2344     }
2345
2346 //    void dumpHeap() {
2347 //
2348 //        try {
2349 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2350 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2351 //                    "d:/heap" + flushCounter + ".txt", true);
2352 //        } catch (IOException e) {
2353 //            e.printStackTrace();
2354 //        }
2355 //
2356 //    }
2357
2358     void fireReactionsToSynchronize(ChangeSet cs) {
2359
2360         // Do not fire empty events
2361         if (cs.isEmpty())
2362             return;
2363
2364         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2365 //        g.state.barrier.inc();
2366
2367         try {
2368
2369             if (!cs.isEmpty()) {
2370                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2371                 for (ChangeListener l : changeListeners2) {
2372                     try {
2373                         l.graphChanged(e2);
2374                     } catch (Exception ex) {
2375                         ex.printStackTrace();
2376                     }
2377                 }
2378             }
2379
2380         } finally {
2381
2382 //            g.state.barrier.dec();
2383 //            g.waitAsync(null);
2384
2385         }
2386
2387     }
2388
2389     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2390         try {
2391
2392             // Do not fire empty events
2393             if (cs2.isEmpty())
2394                 return;
2395
2396 //            graph.restart();
2397 //            graph.state.barrier.inc();
2398
2399             try {
2400
2401                 if (!cs2.isEmpty()) {
2402
2403                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2404                     for (ChangeListener l : changeListeners2) {
2405                         try {
2406                             l.graphChanged(e2);
2407 //                            System.out.println("changelistener " + l);
2408                         } catch (Exception ex) {
2409                             ex.printStackTrace();
2410                         }
2411                     }
2412
2413                 }
2414
2415             } finally {
2416
2417 //                graph.state.barrier.dec();
2418 //                graph.waitAsync(null);
2419
2420             }
2421         } catch (Throwable t) {
2422             t.printStackTrace();
2423
2424         }
2425     }
2426     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2427
2428         try {
2429
2430             // Do not fire empty events
2431             if (cs2.isEmpty())
2432                 return;
2433
2434             // Do not fire on virtual requests
2435             if(graph.getProvider() != null)
2436                 return;
2437
2438             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2439
2440             try {
2441
2442                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2443                 for (ChangeListener l : metadataListeners) {
2444                     try {
2445                         l.graphChanged(e2);
2446                     } catch (Throwable ex) {
2447                         ex.printStackTrace();
2448                     }
2449                 }
2450
2451             } finally {
2452
2453             }
2454
2455         } catch (Throwable t) {
2456             t.printStackTrace();
2457         }
2458
2459     }
2460
2461
2462     /*
2463      * (non-Javadoc)
2464      *
2465      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2466      */
2467     @Override
2468     public <T> T getService(Class<T> api) {
2469         T t = peekService(api);
2470         if (t == null) {
2471             if (state.isClosed())
2472                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2473             throw new ServiceNotFoundException(this, api);
2474         }
2475         return t;
2476     }
2477
2478     /**
2479      * @return
2480      */
2481     protected abstract ServerInformation getCachedServerInformation();
2482
2483         private Class<?> serviceKey1 = null;
2484         private Class<?> serviceKey2 = null;
2485         private Object service1 = null;
2486         private Object service2 = null;
2487
2488     /*
2489      * (non-Javadoc)
2490      *
2491      * @see
2492      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2493      */
2494     @SuppressWarnings("unchecked")
2495     @Override
2496     public synchronized <T> T peekService(Class<T> api) {
2497
2498                 if(serviceKey1 == api) {
2499                         return (T)service1;
2500                 } else if (serviceKey2 == api) {
2501                         // Promote this key
2502                         Object result = service2;
2503                         service2 = service1;
2504                         serviceKey2 = serviceKey1;
2505                         service1 = result;
2506                         serviceKey1 = api;
2507                         return (T)result;
2508                 }
2509
2510         if (Layer0.class == api)
2511                 return (T) L0;
2512         if (ServerInformation.class == api)
2513             return (T) getCachedServerInformation();
2514         else if (WriteGraphImpl.class == api)
2515             return (T) writeState.getGraph();
2516         else if (ClusterBuilder.class == api)
2517             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2518         else if (ClusterBuilderFactory.class == api)
2519             return (T)new ClusterBuilderFactoryImpl(this);
2520
2521                 service2 = service1;
2522                 serviceKey2 = serviceKey1;
2523
2524         service1 = serviceLocator.peekService(api);
2525         serviceKey1 = api;
2526
2527         return (T)service1;
2528
2529     }
2530
2531     /*
2532      * (non-Javadoc)
2533      *
2534      * @see
2535      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2536      */
2537     @Override
2538     public boolean hasService(Class<?> api) {
2539         return serviceLocator.hasService(api);
2540     }
2541
2542     /**
2543      * @param api the api that must be implemented by the specified service
2544      * @param service the service implementation
2545      */
2546     @Override
2547     public <T> void registerService(Class<T> api, T service) {
2548         if(Layer0.class == api) {
2549                 L0 = (Layer0)service;
2550                 return;
2551         }
2552         serviceLocator.registerService(api, service);
2553         if (TransactionPolicySupport.class == api) {
2554             transactionPolicy = (TransactionPolicySupport)service;
2555             state.resetTransactionPolicy();
2556         }
2557         if (api == serviceKey1)
2558                 service1 = service;
2559         else if (api == serviceKey2)
2560                 service2 = service;
2561     }
2562
2563     // ----------------
2564     // SessionMonitor
2565     // ----------------
2566
2567     void fireSessionVariableChange(String variable) {
2568         for (MonitorHandler h : monitorHandlers) {
2569             MonitorContext ctx = monitorContexts.getLeft(h);
2570             assert ctx != null;
2571             // SafeRunner functionality repeated here to avoid dependency.
2572             try {
2573                 h.valuesChanged(ctx);
2574             } catch (Exception e) {
2575                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2576             } catch (LinkageError e) {
2577                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2578             }
2579         }
2580     }
2581
2582
2583     class ResourceSerializerImpl implements ResourceSerializer {
2584
2585         public long createRandomAccessId(int id) throws DatabaseException {
2586             if(id < 0)
2587                 return id;
2588             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2589             long cluster = getCluster(id);
2590             if (0 == cluster)
2591                 return 0; // Better to return 0 then invalid id.
2592             long result = ClusterTraitsBase.createResourceId(cluster, index);
2593             return result;
2594         }
2595
2596         @Override
2597         public long getRandomAccessId(Resource resource) throws DatabaseException {
2598
2599             ResourceImpl resourceImpl = (ResourceImpl) resource;
2600             return createRandomAccessId(resourceImpl.id);
2601
2602         }
2603
2604         @Override
2605         public int getTransientId(Resource resource) throws DatabaseException {
2606
2607             ResourceImpl resourceImpl = (ResourceImpl) resource;
2608             return resourceImpl.id;
2609
2610         }
2611
2612         @Override
2613         public String createRandomAccessId(Resource resource)
2614         throws InvalidResourceReferenceException {
2615
2616             if(resource == null) throw new IllegalArgumentException();
2617
2618             ResourceImpl resourceImpl = (ResourceImpl) resource;
2619
2620             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2621
2622             int r;
2623             try {
2624                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2625             } catch (DatabaseException e1) {
2626                 throw new InvalidResourceReferenceException(e1);
2627             }
2628             try {
2629                 // Serialize as '<resource index>_<cluster id>'
2630                 return "" + r + "_" + getCluster(resourceImpl);
2631             } catch (Throwable e) {
2632                 e.printStackTrace();
2633                 throw new InvalidResourceReferenceException(e);
2634             } finally {
2635             }
2636         }
2637
2638         public int getTransientId(long serialized) throws DatabaseException {
2639             if (serialized <= 0)
2640                 return (int)serialized;
2641             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2642             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2643             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2644             if (cluster == null)
2645                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2646             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2647             return key;
2648         }
2649
2650         @Override
2651         public Resource getResource(long randomAccessId) throws DatabaseException {
2652             return getResourceByKey(getTransientId(randomAccessId));
2653         }
2654
2655         @Override
2656         public Resource getResource(int transientId) throws DatabaseException {
2657             return getResourceByKey(transientId);
2658         }
2659
2660         @Override
2661         public Resource getResource(String randomAccessId)
2662         throws InvalidResourceReferenceException {
2663             try {
2664                 int i = randomAccessId.indexOf('_');
2665                 if (i == -1)
2666                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2667                             + randomAccessId + "'");
2668                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2669                 if(r < 0) return getResourceByKey(r);
2670                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2671                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2672                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2673                 if (cluster.hasResource(key, clusterTranslator))
2674                     return getResourceByKey(key);
2675             } catch (InvalidResourceReferenceException e) {
2676                 throw e;
2677             } catch (NumberFormatException e) {
2678                 throw new InvalidResourceReferenceException(e);
2679             } catch (Throwable e) {
2680                 e.printStackTrace();
2681                 throw new InvalidResourceReferenceException(e);
2682             } finally {
2683             }
2684             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2685         }
2686
2687         @Override
2688         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2689             try {
2690                 return true;
2691             } catch (Throwable e) {
2692                 e.printStackTrace();
2693                 throw new InvalidResourceReferenceException(e);
2694             } finally {
2695             }
2696         }
2697     }
2698
2699     // Copied from old SessionImpl
2700
2701     void check() {
2702         if (state.isClosed())
2703             throw new Error("Session closed.");
2704     }
2705
2706
2707
2708     public ResourceImpl getNewResource(long clusterId) {
2709         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2710         int newId;
2711         try {
2712             newId = cluster.createResource(clusterTranslator);
2713         } catch (DatabaseException e) {
2714             Logger.defaultLogError(e);
2715             return null;
2716         }
2717         return new ResourceImpl(resourceSupport, newId);
2718     }
2719
2720     public ResourceImpl getNewResource(Resource clusterSet)
2721     throws DatabaseException {
2722         long resourceId = clusterSet.getResourceId();
2723         Long clusterId = clusterSetsSupport.get(resourceId);
2724         if (null == clusterId)
2725             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2726         if (Constants.NewClusterId == clusterId) {
2727             clusterId = getService(ClusteringSupport.class).createCluster();
2728             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2729                 createdClusters.add(clusterId);
2730             clusterSetsSupport.put(resourceId, clusterId);
2731             return getNewResource(clusterId);
2732         } else {
2733             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2734             ResourceImpl result;
2735             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2736                 clusterId = getService(ClusteringSupport.class).createCluster();
2737                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2738                     createdClusters.add(clusterId);
2739                 clusterSetsSupport.put(resourceId, clusterId);
2740                 return getNewResource(clusterId);
2741             } else {
2742                 result = getNewResource(clusterId);
2743                 int resultKey = querySupport.getId(result);
2744                 long resultCluster = querySupport.getClusterId(resultKey);
2745                 if (clusterId != resultCluster)
2746                     clusterSetsSupport.put(resourceId, resultCluster);
2747                 return result;
2748             }
2749         }
2750     }
2751
2752     public void getNewClusterSet(Resource clusterSet)
2753     throws DatabaseException {
2754         if (DEBUG)
2755             System.out.println("new cluster set=" + clusterSet);
2756         long resourceId = clusterSet.getResourceId();
2757         if (clusterSetsSupport.containsKey(resourceId))
2758             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2759         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2760     }
2761     public boolean containsClusterSet(Resource clusterSet)
2762     throws ServiceException {
2763         long resourceId = clusterSet.getResourceId();
2764         return clusterSetsSupport.containsKey(resourceId);
2765     }
2766     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2767         Resource r = defaultClusterSet;
2768         defaultClusterSet = clusterSet;
2769         return r;
2770     }
2771     void printDiagnostics() {
2772
2773         if (DIAGNOSTICS) {
2774
2775             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2776             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2777             for(ClusterI cluster : clusterTable.getClusters()) {
2778                 try {
2779                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2780                         residentClusters.set(residentClusters.get() + 1);
2781                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2782                     }
2783                 } catch (DatabaseException e) {
2784                     Logger.defaultLogError(e);
2785                 }
2786             }
2787
2788             System.out.println("--------------------------------");
2789             System.out.println("Cluster information:");
2790             System.out.println("-amount of resident clusters=" + residentClusters.get());
2791             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2792
2793             for(ClusterI cluster : clusterTable.getClusters()) {
2794                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2795                 try {
2796                     if (!cluster.isLoaded())
2797                         System.out.println("not loaded]");
2798                     else if (cluster.isEmpty())
2799                         System.out.println("is empty]");
2800                     else {
2801                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2802                         System.out.print(",references=" + cluster.getReferenceCount());
2803                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2804                     }
2805                 } catch (DatabaseException e) {
2806                     Logger.defaultLogError(e);
2807                     System.out.println("is corrupted]");
2808                 }
2809             }
2810
2811             queryProvider2.printDiagnostics();
2812             System.out.println("--------------------------------");
2813         }
2814
2815     }
2816
2817     public void fireStartReadTransaction() {
2818
2819         if (DIAGNOSTICS)
2820             System.out.println("StartReadTransaction");
2821
2822         for (SessionEventListener listener : eventListeners) {
2823             try {
2824                 listener.readTransactionStarted();
2825             } catch (Throwable t) {
2826                 t.printStackTrace();
2827             }
2828         }
2829
2830     }
2831
2832     public void fireFinishReadTransaction() {
2833
2834         if (DIAGNOSTICS)
2835             System.out.println("FinishReadTransaction");
2836
2837         for (SessionEventListener listener : eventListeners) {
2838             try {
2839                 listener.readTransactionFinished();
2840             } catch (Throwable t) {
2841                 t.printStackTrace();
2842             }
2843         }
2844
2845     }
2846
2847     public void fireStartWriteTransaction() {
2848
2849         if (DIAGNOSTICS)
2850             System.out.println("StartWriteTransaction");
2851
2852         for (SessionEventListener listener : eventListeners) {
2853             try {
2854                 listener.writeTransactionStarted();
2855             } catch (Throwable t) {
2856                 t.printStackTrace();
2857             }
2858         }
2859
2860     }
2861
2862     public void fireFinishWriteTransaction() {
2863
2864         if (DIAGNOSTICS)
2865             System.out.println("FinishWriteTransaction");
2866
2867         for (SessionEventListener listener : eventListeners) {
2868             try {
2869                 listener.writeTransactionFinished();
2870             } catch (Throwable t) {
2871                 t.printStackTrace();
2872             }
2873         }
2874
2875         Indexing.resetDependenciesIndexingDisabled();
2876
2877     }
2878
2879     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2880         throw new Error("Not supported at the moment.");
2881     }
2882
2883     State getState() {
2884         return state;
2885     }
2886
2887     ClusterTable getClusterTable() {
2888         return clusterTable;
2889     }
2890
2891     public GraphSession getGraphSession() {
2892         return graphSession;
2893     }
2894
2895     public QueryProcessor getQueryProvider2() {
2896         return queryProvider2;
2897     }
2898
2899     ClientChangesImpl getClientChanges() {
2900         return clientChanges;
2901     }
2902
2903     boolean getWriteOnly() {
2904         return writeOnly;
2905     }
2906
2907     static int counter = 0;
2908
2909     public void onClusterLoaded(long clusterId) {
2910
2911         clusterTable.updateSize();
2912
2913     }
2914
2915
2916
2917
2918     /*
2919      * Implementation of the interface RequestProcessor
2920      */
2921
2922     @Override
2923     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2924
2925         assertNotSession();
2926         assertAlive();
2927
2928         assert(request != null);
2929
2930         final DataContainer<T> result = new DataContainer<T>();
2931         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2932
2933         syncRequest(request, new AsyncProcedure<T>() {
2934
2935             @Override
2936             public void execute(AsyncReadGraph graph, T t) {
2937                 result.set(t);
2938             }
2939
2940             @Override
2941             public void exception(AsyncReadGraph graph, Throwable t) {
2942                 exception.set(t);
2943             }
2944
2945         });
2946
2947         Throwable t = exception.get();
2948         if(t != null) {
2949             if(t instanceof DatabaseException) throw (DatabaseException)t;
2950             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2951         }
2952
2953         return result.get();
2954
2955     }
2956
2957 //    @Override
2958 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2959 //        assertNotSession();
2960 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2961 //    }
2962
2963     @Override
2964     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2965         assertNotSession();
2966         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2967     }
2968
2969     @Override
2970     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2971         assertNotSession();
2972         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2973     }
2974
2975     @Override
2976     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2977         assertNotSession();
2978         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2979     }
2980
2981     @Override
2982     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2983         assertNotSession();
2984         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2985     }
2986
2987     @Override
2988     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2989
2990         assertNotSession();
2991
2992         assert(request != null);
2993
2994         final DataContainer<T> result = new DataContainer<T>();
2995         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2996
2997         syncRequest(request, new AsyncProcedure<T>() {
2998
2999             @Override
3000             public void execute(AsyncReadGraph graph, T t) {
3001                 result.set(t);
3002             }
3003
3004             @Override
3005             public void exception(AsyncReadGraph graph, Throwable t) {
3006                 exception.set(t);
3007             }
3008
3009         });
3010
3011         Throwable t = exception.get();
3012         if(t != null) {
3013             if(t instanceof DatabaseException) throw (DatabaseException)t;
3014             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3015         }
3016
3017         return result.get();
3018
3019     }
3020
3021     @Override
3022     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3023         assertNotSession();
3024         return syncRequest(request, (AsyncProcedure<T>)procedure);
3025     }
3026
3027     @Override
3028     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3029         assertNotSession();
3030         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3031     }
3032
3033     @Override
3034     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3035         assertNotSession();
3036         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3037     }
3038
3039     @Override
3040     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3041         assertNotSession();
3042         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3043     }
3044
3045     @Override
3046     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3047         assertNotSession();
3048         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3049     }
3050
3051     @Override
3052     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3053
3054         assertNotSession();
3055
3056         assert(request != null);
3057
3058         final ArrayList<T> result = new ArrayList<T>();
3059         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3060
3061         syncRequest(request, new AsyncMultiProcedure<T>() {
3062
3063             @Override
3064             public void execute(AsyncReadGraph graph, T t) {
3065                 synchronized(result) {
3066                     result.add(t);
3067                 }
3068             }
3069
3070             @Override
3071             public void finished(AsyncReadGraph graph) {
3072             }
3073
3074             @Override
3075             public void exception(AsyncReadGraph graph, Throwable t) {
3076                 exception.set(t);
3077             }
3078
3079
3080         });
3081
3082         Throwable t = exception.get();
3083         if(t != null) {
3084             if(t instanceof DatabaseException) throw (DatabaseException)t;
3085             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3086         }
3087
3088         return result;
3089
3090     }
3091
3092     @Override
3093     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3094         assertNotSession();
3095         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3096     }
3097
3098     @Override
3099     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3100         assertNotSession();
3101         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3102     }
3103
3104     @Override
3105     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3106         assertNotSession();
3107         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3108     }
3109
3110     @Override
3111     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3112         assertNotSession();
3113         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3114     }
3115
3116     @Override
3117     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3118         assertNotSession();
3119         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3120     }
3121
3122     @Override
3123     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3124
3125         assertNotSession();
3126
3127         assert(request != null);
3128
3129         final ArrayList<T> result = new ArrayList<T>();
3130         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3131
3132         syncRequest(request, new AsyncMultiProcedure<T>() {
3133
3134             @Override
3135             public void execute(AsyncReadGraph graph, T t) {
3136                 synchronized(result) {
3137                     result.add(t);
3138                 }
3139             }
3140
3141             @Override
3142             public void finished(AsyncReadGraph graph) {
3143             }
3144
3145             @Override
3146             public void exception(AsyncReadGraph graph, Throwable t) {
3147                 exception.set(t);
3148             }
3149
3150
3151         });
3152
3153         Throwable t = exception.get();
3154         if(t != null) {
3155             if(t instanceof DatabaseException) throw (DatabaseException)t;
3156             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3157         }
3158
3159         return result;
3160
3161     }
3162
3163     @Override
3164     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3165         assertNotSession();
3166         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3167     }
3168
3169     @Override
3170     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3171         assertNotSession();
3172         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3173     }
3174
3175     @Override
3176     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3177         assertNotSession();
3178        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3179     }
3180
3181     @Override
3182     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3183         assertNotSession();
3184         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3185     }
3186
3187     @Override
3188     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3189         assertNotSession();
3190         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3191     }
3192
3193
3194     @Override
3195     public <T> void asyncRequest(Read<T> request) {
3196
3197         asyncRequest(request, new ProcedureAdapter<T>() {
3198             @Override
3199             public void exception(Throwable t) {
3200                 t.printStackTrace();
3201             }
3202         });
3203
3204     }
3205
3206     @Override
3207     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3208         asyncRequest(request, (AsyncProcedure<T>)procedure);
3209     }
3210
3211     @Override
3212     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3213         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3214     }
3215
3216     @Override
3217     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3218         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3219     }
3220
3221     @Override
3222     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3223         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3224     }
3225
3226     @Override
3227     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3228         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3229     }
3230
3231     @Override
3232     final public <T> void asyncRequest(final AsyncRead<T> request) {
3233
3234         assert(request != null);
3235
3236         asyncRequest(request, new ProcedureAdapter<T>() {
3237             @Override
3238             public void exception(Throwable t) {
3239                 t.printStackTrace();
3240             }
3241         });
3242
3243     }
3244
3245     @Override
3246     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3247         scheduleRequest(request, procedure, procedure, null);
3248     }
3249
3250     @Override
3251     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3252         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3253     }
3254
3255     @Override
3256     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3257         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3258     }
3259
3260     @Override
3261     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3262         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3263     }
3264
3265     @Override
3266     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3267         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3268     }
3269
3270     @Override
3271     public <T> void asyncRequest(MultiRead<T> request) {
3272
3273         assert(request != null);
3274
3275         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3276             @Override
3277             public void exception(AsyncReadGraph graph, Throwable t) {
3278                 t.printStackTrace();
3279             }
3280         });
3281
3282     }
3283
3284     @Override
3285     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3286         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3287     }
3288
3289     @Override
3290     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3291         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3292     }
3293
3294     @Override
3295     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3296         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3297     }
3298
3299     @Override
3300     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3301         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3302     }
3303
3304     @Override
3305     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3306         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3307     }
3308
3309     @Override
3310     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3311
3312         assert(request != null);
3313
3314         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3315             @Override
3316             public void exception(AsyncReadGraph graph, Throwable t) {
3317                 t.printStackTrace();
3318             }
3319         });
3320
3321     }
3322
3323     @Override
3324     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3325         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3326     }
3327
3328     @Override
3329     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3330         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3331     }
3332
3333     @Override
3334     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3335         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3336     }
3337
3338     @Override
3339     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3340         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3341     }
3342
3343     @Override
3344     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3345         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3346     }
3347
3348     @Override
3349     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3350         assertNotSession();
3351         throw new Error("Not implemented!");
3352     }
3353
3354     @Override
3355     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3356         throw new Error("Not implemented!");
3357     }
3358
3359     @Override
3360     final public <T> void asyncRequest(final ExternalRead<T> request) {
3361
3362         assert(request != null);
3363
3364         asyncRequest(request, new ProcedureAdapter<T>() {
3365             @Override
3366             public void exception(Throwable t) {
3367                 t.printStackTrace();
3368             }
3369         });
3370
3371     }
3372
3373     @Override
3374     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3375         asyncRequest(request, (Procedure<T>)procedure);
3376     }
3377
3378
3379
3380     void check(Throwable t) throws DatabaseException {
3381         if(t != null) {
3382             if(t instanceof DatabaseException) throw (DatabaseException)t;
3383             else throw new DatabaseException("Unexpected exception", t);
3384         }
3385     }
3386
3387     void check(DataContainer<Throwable> container) throws DatabaseException {
3388         Throwable t = container.get();
3389         if(t != null) {
3390             if(t instanceof DatabaseException) throw (DatabaseException)t;
3391             else throw new DatabaseException("Unexpected exception", t);
3392         }
3393     }
3394
3395
3396
3397
3398
3399
3400     boolean sameProvider(Write request) {
3401         if(writeState.getGraph().provider != null) {
3402             return writeState.getGraph().provider.equals(request.getProvider());
3403         } else {
3404             return request.getProvider() == null;
3405         }
3406     }
3407
3408     boolean plainWrite(WriteGraphImpl graph) {
3409         if(graph == null) return false;
3410         if(graph.writeSupport.writeOnly()) return false;
3411         return true;
3412     }
3413
3414
3415     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3416     private void assertNotSession() throws DatabaseException {
3417         Thread current = Thread.currentThread();
3418         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3419     }
3420
3421     void assertAlive() {
3422         if (!state.isAlive())
3423             throw new RuntimeDatabaseException("Session has been shut down.");
3424     }
3425
3426     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3427         return querySupport.getValueStream(graph, querySupport.getId(resource));
3428     }
3429
3430     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3431         return querySupport.getValue(graph, querySupport.getId(resource));
3432     }
3433
3434     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3435         acquire(semaphore, request, null);
3436     }
3437
3438     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3439         assertAlive();
3440         try {
3441             long tryCount = 0;
3442             // Loop until the semaphore is acquired reporting requests that take a long time.
3443             while (true) {
3444
3445                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3446                     // Acquired OK.
3447                     return;
3448                 } else {
3449                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3450                         ++tryCount;
3451                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3452                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3453                                 * tryCount;
3454                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3455                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3456                     }
3457                 }
3458
3459                 assertAlive();
3460
3461             }
3462         } catch (InterruptedException e) {
3463             e.printStackTrace();
3464             // FIXME: Should perhaps do something else in this case ??
3465         }
3466     }
3467
3468
3469
3470 //    @Override
3471 //    public boolean holdOnToTransactionAfterCancel() {
3472 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3473 //    }
3474 //
3475 //    @Override
3476 //    public boolean holdOnToTransactionAfterCommit() {
3477 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3478 //    }
3479 //
3480 //    @Override
3481 //    public boolean holdOnToTransactionAfterRead() {
3482 //        return transactionPolicy.holdOnToTransactionAfterRead();
3483 //    }
3484 //
3485 //    @Override
3486 //    public void onRelinquish() {
3487 //        transactionPolicy.onRelinquish();
3488 //    }
3489 //
3490 //    @Override
3491 //    public void onRelinquishDone() {
3492 //        transactionPolicy.onRelinquishDone();
3493 //    }
3494 //
3495 //    @Override
3496 //    public void onRelinquishError() {
3497 //        transactionPolicy.onRelinquishError();
3498 //    }
3499
3500
3501     @Override
3502     public Session getSession() {
3503         return this;
3504     }
3505
3506
3507     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3508     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3509
3510     public void ceased(int thread) {
3511
3512         requestManager.ceased(thread);
3513
3514     }
3515
3516     public int getAmountOfQueryThreads() {
3517         // This must be a power of two
3518         return 4;
3519 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3520     }
3521
3522     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3523
3524         return new ResourceImpl(resourceSupport, key);
3525
3526     }
3527
3528     public void acquireWriteOnly() {
3529         writeOnly = true;
3530     }
3531
3532     public void releaseWriteOnly(ReadGraphImpl graph) {
3533         writeOnly = false;
3534         queryProvider2.releaseWrite(graph);
3535     }
3536
3537     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3538
3539         long start = System.nanoTime();
3540
3541         while(dirtyPrimitives) {
3542             dirtyPrimitives = false;
3543             getQueryProvider2().performDirtyUpdates(writer);
3544             getQueryProvider2().performScheduledUpdates(writer);
3545         }
3546
3547         fireMetadataListeners(writer, clientChanges);
3548
3549         if(DebugPolicy.PERFORMANCE_DATA) {
3550             long end = System.nanoTime();
3551             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3552         }
3553
3554     }
3555
3556     void removeTemporaryData() {
3557         File platform = Platform.getLocation().toFile();
3558         File tempFiles = new File(platform, "tempFiles");
3559         File temp = new File(tempFiles, "db");
3560         if(!temp.exists()) 
3561             return;
3562         try {
3563             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3564                 @Override
3565                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3566                     try {
3567                         Files.delete(file);
3568                     } catch (IOException e) {
3569                         Logger.defaultLogError(e);
3570                     }
3571                     return FileVisitResult.CONTINUE;
3572                 }
3573             });
3574         } catch (IOException e) {
3575             Logger.defaultLogError(e);
3576         }
3577     }
3578
3579     public void handleCreatedClusters() {
3580         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3581             createdClusters.forEach(new TLongProcedure() {
3582
3583                 @Override
3584                 public boolean execute(long value) {
3585                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3586                     cluster.setImmutable(true, clusterTranslator);
3587                     return true;
3588                 }
3589             });
3590         }
3591         createdClusters.clear();
3592     }
3593
3594     @Override
3595     public Object getModificationCounter() {
3596         return queryProvider2.modificationCounter;
3597     }
3598     
3599     @Override
3600     public void markUndoPoint() {
3601         state.setCombine(false);
3602     }
3603
3604 }