]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Better emptying of trash bin
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryProcessor;
93 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
94 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
95 import org.simantics.db.impl.service.QueryDebug;
96 import org.simantics.db.impl.support.VirtualGraphServerSupport;
97 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
98 import org.simantics.db.procedure.AsyncListener;
99 import org.simantics.db.procedure.AsyncMultiListener;
100 import org.simantics.db.procedure.AsyncMultiProcedure;
101 import org.simantics.db.procedure.AsyncProcedure;
102 import org.simantics.db.procedure.Listener;
103 import org.simantics.db.procedure.ListenerBase;
104 import org.simantics.db.procedure.MultiListener;
105 import org.simantics.db.procedure.MultiProcedure;
106 import org.simantics.db.procedure.Procedure;
107 import org.simantics.db.procedure.SyncListener;
108 import org.simantics.db.procedure.SyncMultiListener;
109 import org.simantics.db.procedure.SyncMultiProcedure;
110 import org.simantics.db.procedure.SyncProcedure;
111 import org.simantics.db.procore.cluster.ClusterImpl;
112 import org.simantics.db.procore.cluster.ClusterTraits;
113 import org.simantics.db.procore.protocol.Constants;
114 import org.simantics.db.procore.protocol.DebugPolicy;
115 import org.simantics.db.request.AsyncMultiRead;
116 import org.simantics.db.request.AsyncRead;
117 import org.simantics.db.request.DelayedWrite;
118 import org.simantics.db.request.DelayedWriteResult;
119 import org.simantics.db.request.ExternalRead;
120 import org.simantics.db.request.MultiRead;
121 import org.simantics.db.request.Read;
122 import org.simantics.db.request.ReadInterface;
123 import org.simantics.db.request.Write;
124 import org.simantics.db.request.WriteInterface;
125 import org.simantics.db.request.WriteOnly;
126 import org.simantics.db.request.WriteOnlyResult;
127 import org.simantics.db.request.WriteResult;
128 import org.simantics.db.request.WriteTraits;
129 import org.simantics.db.service.ByteReader;
130 import org.simantics.db.service.ClusterBuilder;
131 import org.simantics.db.service.ClusterBuilderFactory;
132 import org.simantics.db.service.ClusterControl;
133 import org.simantics.db.service.ClusterSetsSupport;
134 import org.simantics.db.service.ClusterUID;
135 import org.simantics.db.service.ClusteringSupport;
136 import org.simantics.db.service.CollectionSupport;
137 import org.simantics.db.service.DebugSupport;
138 import org.simantics.db.service.DirectQuerySupport;
139 import org.simantics.db.service.GraphChangeListenerSupport;
140 import org.simantics.db.service.InitSupport;
141 import org.simantics.db.service.LifecycleSupport;
142 import org.simantics.db.service.ManagementSupport;
143 import org.simantics.db.service.QueryControl;
144 import org.simantics.db.service.SerialisationSupport;
145 import org.simantics.db.service.ServerInformation;
146 import org.simantics.db.service.ServiceActivityMonitor;
147 import org.simantics.db.service.SessionEventSupport;
148 import org.simantics.db.service.SessionMonitorSupport;
149 import org.simantics.db.service.SessionUserSupport;
150 import org.simantics.db.service.StatementSupport;
151 import org.simantics.db.service.TransactionPolicySupport;
152 import org.simantics.db.service.TransactionSupport;
153 import org.simantics.db.service.TransferableGraphSupport;
154 import org.simantics.db.service.UndoRedoSupport;
155 import org.simantics.db.service.VirtualGraphSupport;
156 import org.simantics.db.service.XSupport;
157 import org.simantics.layer0.Layer0;
158 import org.simantics.utils.DataContainer;
159 import org.simantics.utils.Development;
160 import org.simantics.utils.threads.logger.ITask;
161 import org.simantics.utils.threads.logger.ThreadLogger;
162
163 import gnu.trove.procedure.TLongProcedure;
164 import gnu.trove.set.hash.TLongHashSet;
165
166
167 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
168
169     protected static final boolean DEBUG        = false;
170
171     private static final boolean DIAGNOSTICS       = false;
172
173     private TransactionPolicySupport transactionPolicy;
174
175     final private ClusterControl clusterControl;
176
177     final protected BuiltinSupportImpl builtinSupport;
178     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
179     final protected ClusterSetsSupport clusterSetsSupport;
180     final protected LifecycleSupportImpl lifecycleSupport;
181
182     protected QuerySupportImpl querySupport;
183     protected ResourceSupportImpl resourceSupport;
184     protected WriteSupport writeSupport;
185     public ClusterTranslator clusterTranslator;
186
187     boolean dirtyPrimitives = false;
188
189     public static final int SERVICE_MODE_CREATE = 2;
190     public static final int SERVICE_MODE_ALLOW = 1;
191     public int serviceMode = 0;
192     public boolean createdImmutableClusters = false;
193     public TLongHashSet createdClusters = new TLongHashSet();
194
195     private Layer0 L0;
196
197     /**
198      * The service locator maintained by the workbench. These services are
199      * initialized during workbench during the <code>init</code> method.
200      */
201     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
202
203     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
204
205     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
206
207     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
208
209     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
210
211     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
212
213     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
214
215     final protected State                                        state              = new State();
216
217     protected GraphSession                                       graphSession       = null;
218
219     protected SessionManager                                     sessionManagerImpl = null;
220
221     protected UserAuthenticationAgent                            authAgent          = null;
222
223     protected UserAuthenticator                                  authenticator      = null;
224
225     protected Resource                                           user               = null;
226
227     protected ClusterStream                                      clusterStream      = null;
228
229     protected SessionRequestManager                         requestManager = null;
230
231     public ClusterTable                                       clusterTable       = null;
232
233     public QueryProcessor                                     queryProvider2 = null;
234
235     ClientChangesImpl                                  clientChanges      = null;
236
237     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
238
239 //    protected long                                               newClusterId       = Constants.NullClusterId;
240
241     protected int                                                flushCounter       = 0;
242
243     protected boolean                                            writeOnly          = false;
244
245     WriteState<?>                                                writeState = null;
246     WriteStateBase<?>                                            delayedWriteState = null;
247     protected Resource defaultClusterSet = null; // If not null then used for newResource().
248
249     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
250
251 //        if (authAgent == null)
252 //            throw new IllegalArgumentException("null authentication agent");
253
254         File t = StaticSessionProperties.virtualGraphStoragePath;
255         if (null == t)
256             t = new File(".");
257         this.clusterTable = new ClusterTable(this, t);
258         this.builtinSupport = new BuiltinSupportImpl(this);
259         this.sessionManagerImpl = sessionManagerImpl;
260         this.user = null;
261 //        this.authAgent = authAgent;
262
263         serviceLocator.registerService(Session.class, this);
264         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
265         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
266         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
267         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
268         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
269         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
270         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
271         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
272         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
273         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
274         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
275         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
276         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
277         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
278         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
279         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
280         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
281         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
282         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
283         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
284         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
285         ServiceActivityUpdaterForWriteTransactions.register(this);
286
287         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
288         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
289         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
290         this.lifecycleSupport = new LifecycleSupportImpl(this);
291         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
292         this.transactionPolicy = new TransactionPolicyRelease();
293         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
294         this.clusterControl = new ClusterControlImpl(this);
295         serviceLocator.registerService(ClusterControl.class, clusterControl);
296         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
297         this.clusterSetsSupport.setReadDirectory(t.toPath());
298         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
299         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
300     }
301
302     /*
303      *
304      * Session interface
305      */
306
307
308     @Override
309     public Resource getRootLibrary() {
310         return queryProvider2.getRootLibraryResource();
311     }
312
313     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
314         if (!graphSession.dbSession.refreshEnabled())
315             return;
316         try {
317             getClusterTable().refresh(csid, this, clusterUID);
318         } catch (Throwable t) {
319             Logger.defaultLogError("Refesh failed.", t);
320         }
321     }
322
323     static final class TaskHelper {
324         private final String name;
325         private Object result;
326         final Semaphore sema = new Semaphore(0);
327         private Throwable throwable = null;
328         final Consumer<DatabaseException> callback = e -> {
329             synchronized (TaskHelper.this) {
330                 throwable = e;
331             }
332         };
333         final Procedure<Object> proc = new Procedure<Object>() {
334             @Override
335             public void execute(Object result) {
336                 callback.accept(null);
337             }
338             @Override
339             public void exception(Throwable t) {
340                 if (t instanceof DatabaseException)
341                     callback.accept((DatabaseException)t);
342                 else
343                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
344             }
345         };
346         final WriteTraits writeTraits = new WriteTraits() {};
347         TaskHelper(String name) {
348             this.name = name;
349         }
350         @SuppressWarnings("unchecked")
351         <T> T getResult() {
352             return (T)result;
353         }
354         void setResult(Object result) {
355             this.result = result;
356         }
357 //        Throwable throwableGet() {
358 //            return throwable;
359 //        }
360         synchronized void throwableSet(Throwable t) {
361             throwable = t;
362         }
363 //        void throwableSet(String t) {
364 //            throwable = new InternalException("" + name + " operation failed. " + t);
365 //        }
366         synchronized void throwableCheck()
367         throws DatabaseException {
368             if (null != throwable)
369                 if (throwable instanceof DatabaseException)
370                     throw (DatabaseException)throwable;
371                 else
372                     throw new DatabaseException("Undo operation failed.", throwable);
373         }
374         void throw_(String message)
375         throws DatabaseException {
376             throw new DatabaseException("" + name + " operation failed. " + message);
377         }
378     }
379
380
381
382     private ListenerBase getListenerBase(Object procedure) {
383         if (procedure instanceof ListenerBase)
384             return (ListenerBase) procedure;
385         else
386             return null;
387     }
388
389     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
390         scheduleRequest(request, callback, notify, null);
391     }
392
393     /* (non-Javadoc)
394      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
395      */
396     @Override
397     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
398
399         assert (request != null);
400
401         if(Development.DEVELOPMENT) {
402             try {
403             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
404                 System.err.println("schedule write '" + request + "'");
405             } catch (Throwable t) {
406                 Logger.defaultLogError(t);
407             }
408         }
409
410         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
411
412         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
413
414             @Override
415             public void run(int thread) {
416
417                 if(Development.DEVELOPMENT) {
418                     try {
419                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
420                         System.err.println("perform write '" + request + "'");
421                     } catch (Throwable t) {
422                         Logger.defaultLogError(t);
423                     }
424                 }
425
426                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
427
428                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
429
430                 try {
431
432                     flushCounter = 0;
433                     Disposable.safeDispose(clientChanges);
434                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
435
436                     VirtualGraph vg = getProvider(request.getProvider());
437
438                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
439                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
440
441                         @Override
442                         public void execute(Object result) {
443                             if(callback != null) callback.accept(null);
444                         }
445
446                         @Override
447                         public void exception(Throwable t) {
448                             if(callback != null) callback.accept((DatabaseException)t);
449                         }
450
451                     });
452
453                     assert (null != writer);
454 //                    writer.state.barrier.inc();
455
456                     try {
457                         request.perform(writer);
458                         assert (null != writer);
459                     } catch (Throwable t) {
460                         if (!(t instanceof CancelTransactionException))
461                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
462                         writeState.except(t);
463                     } finally {
464 //                        writer.state.barrier.dec();
465 //                        writer.waitAsync(request);
466                     }
467
468
469                     assert(!queryProvider2.dirty);
470
471                 } catch (Throwable e) {
472
473                     // Log it first, just to be safe that the error is always logged.
474                     if (!(e instanceof CancelTransactionException))
475                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
476
477 //                    writeState.getGraph().state.barrier.dec();
478 //                    writeState.getGraph().waitAsync(request);
479
480                     writeState.except(e);
481
482 //                    try {
483 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
484 //                        // All we can do here is to log those, can't really pass them anywhere.
485 //                        if (callback != null) {
486 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
487 //                            else callback.run(new DatabaseException(e));
488 //                        }
489 //                    } catch (Throwable e2) {
490 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
491 //                    }
492 //                    boolean empty = clusterStream.reallyFlush();
493 //                    int callerThread = -1;
494 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
495 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
496 //                    try {
497 //                        // Send all local changes to server so it can calculate correct reverse change set.
498 //                        // This will call clusterStream.accept().
499 //                        state.cancelCommit(context, clusterStream);
500 //                        if (!empty) {
501 //                            if (!context.isOk()) // this is a blocking operation
502 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
503 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
504 //                        }
505 //                        state.cancelCommit2(context, clusterStream);
506 //                    } catch (DatabaseException e3) {
507 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
508 //                    } finally {
509 //                        clusterStream.setOff(false);
510 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
511 //                    }
512
513                 } finally {
514
515                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
516
517                 }
518
519                 task.finish();
520
521                 if(Development.DEVELOPMENT) {
522                     try {
523                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
524                         System.err.println("finish write '" + request + "'");
525                     } catch (Throwable t) {
526                         Logger.defaultLogError(t);
527                     }
528                 }
529
530             }
531
532         }, combine);
533
534     }
535
536     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
537         scheduleRequest(request, procedure, notify, null);
538     }
539
540     /* (non-Javadoc)
541      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
542      */
543     @Override
544     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
545
546         assert (request != null);
547
548         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
549
550         requestManager.scheduleWrite(new SessionTask(request, thread) {
551
552             @Override
553             public void run(int thread) {
554
555                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
556
557                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
558
559                 flushCounter = 0;
560                 Disposable.safeDispose(clientChanges);
561                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
562
563                 VirtualGraph vg = getProvider(request.getProvider());
564                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
565
566                 try {
567                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
568                     writeState = writeStateT;
569
570                     assert (null != writer);
571 //                    writer.state.barrier.inc();
572                     writeStateT.setResult(request.perform(writer));
573                     assert (null != writer);
574
575 //                    writer.state.barrier.dec();
576 //                    writer.waitAsync(null);
577
578                 } catch (Throwable e) {
579
580 //                    writer.state.barrier.dec();
581 //                    writer.waitAsync(null);
582
583                     writeState.except(e);
584
585 //                  state.stopWriteTransaction(clusterStream);
586 //
587 //              } catch (Throwable e) {
588 //                  // Log it first, just to be safe that the error is always logged.
589 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
590 //
591 //                  try {
592 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
593 //                      // All we can do here is to log those, can't really pass them anywhere.
594 //                      if (procedure != null) {
595 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
596 //                          else procedure.exception(new DatabaseException(e));
597 //                      }
598 //                  } catch (Throwable e2) {
599 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
600 //                  }
601 //
602 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
603 //
604 //                  state.stopWriteTransaction(clusterStream);
605
606                 } finally {
607                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
608                 }
609
610 //              if(notify != null) notify.release();
611
612                 task.finish();
613
614             }
615
616         }, combine);
617
618     }
619
620     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
621         scheduleRequest(request, callback, notify, null);
622     }
623
624     /* (non-Javadoc)
625      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
626      */
627     @Override
628     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
629
630         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
631
632         assert (request != null);
633
634         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
635
636         requestManager.scheduleWrite(new SessionTask(request, thread) {
637
638             @Override
639             public void run(int thread) {
640                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
641
642                 Procedure<Object> stateProcedure = new Procedure<Object>() {
643                     @Override
644                     public void execute(Object result) {
645                         if (callback != null)
646                             callback.accept(null);
647                     }
648                     @Override
649                     public void exception(Throwable t) {
650                         if (callback != null) {
651                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
652                             else callback.accept(new DatabaseException(t));
653                         } else
654                             Logger.defaultLogError("Unhandled exception", t);
655                     }
656                 };
657
658                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
659                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
660                 DelayedWriteGraph dwg = null;
661 //                newGraph.state.barrier.inc();
662
663                 try {
664                     dwg = new DelayedWriteGraph(newGraph);
665                     request.perform(dwg);
666                 } catch (Throwable e) {
667                     delayedWriteState.except(e);
668                     total.finish();
669                     dwg.close();
670                     return;
671                 } finally {
672 //                    newGraph.state.barrier.dec();
673 //                    newGraph.waitAsync(request);
674                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
675                 }
676
677                 delayedWriteState = null;
678
679                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
680                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
681
682                 flushCounter = 0;
683                 Disposable.safeDispose(clientChanges);
684                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
685
686                 acquireWriteOnly();
687
688                 VirtualGraph vg = getProvider(request.getProvider());
689                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
690
691                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
692
693                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
694
695                 assert (null != writer);
696 //                writer.state.barrier.inc();
697
698                 try {
699                     // Cannot cancel
700                     dwg.commit(writer, request);
701
702                         if(defaultClusterSet != null) {
703                                 XSupport xs = getService(XSupport.class);
704                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
705                                 ClusteringSupport cs = getService(ClusteringSupport.class);
706                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
707                         }
708
709                     // This makes clusters available from server
710                     clusterStream.reallyFlush();
711
712                     releaseWriteOnly(writer);
713                     handleUpdatesAndMetadata(writer);
714
715                 } catch (ServiceException e) {
716 //                    writer.state.barrier.dec();
717 //                    writer.waitAsync(null);
718
719                     // These shall be requested from server
720                     clusterTable.removeWriteOnlyClusters();
721                     // This makes clusters available from server
722                     clusterStream.reallyFlush();
723
724                     releaseWriteOnly(writer);
725                     writeState.except(e);
726                 } finally {
727                     // Debugging & Profiling
728                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
729                     task2.finish();
730                     total.finish();
731                 }
732             }
733
734         }, combine);
735
736     }
737
738     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
739         scheduleRequest(request, procedure, notify, null);
740     }
741
742     /* (non-Javadoc)
743      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
744      */
745     @Override
746     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
747         throw new Error("Not implemented");
748     }
749
750     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
751         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
752         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
753             createdClusters.add(cluster.clusterId);
754         }
755         return cluster;
756     }
757
758     class WriteOnlySupport implements WriteSupport {
759
760         ClusterStream stream;
761         ClusterImpl currentCluster;
762
763         public WriteOnlySupport() {
764             this.stream = clusterStream;
765         }
766
767         @Override
768         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
769             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
770         }
771
772         @Override
773         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
774
775             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
776
777             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
778                 if(s != queryProvider2.getRootLibrary())
779                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
780
781             try {
782                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
783             } catch (DatabaseException e) {
784                 Logger.defaultLogError(e);
785                 //throw e;
786             }
787
788             clientChanges.invalidate(s);
789
790             if (cluster.isWriteOnly())
791                 return;
792             queryProvider2.updateStatements(s, p);
793
794         }
795
796         @Override
797         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
798             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
799         }
800
801         @Override
802         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
803
804             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
805             try {
806                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
807             } catch (DatabaseException e) {
808                 Logger.defaultLogError(e);
809             }
810
811             clientChanges.invalidate(rid);
812
813             if (cluster.isWriteOnly())
814                 return;
815             queryProvider2.updateValue(rid);
816
817         }
818
819         @Override
820         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
821
822             if(amount < 65536) {
823                 claimValue(provider,resource, reader.readBytes(null, amount));
824                 return;
825             }
826
827             byte[] bytes = new byte[65536];
828
829             int rid = ((ResourceImpl)resource).id;
830             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
831             try {
832                 int left = amount;
833                 while(left > 0) {
834                     int block = Math.min(left, 65536);
835                     reader.readBytes(bytes, block);
836                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
837                     left -= block;
838                 }
839             } catch (DatabaseException e) {
840                 Logger.defaultLogError(e);
841             }
842
843             clientChanges.invalidate(rid);
844
845             if (cluster.isWriteOnly())
846                 return;
847             queryProvider2.updateValue(rid);
848
849         }
850
851         private void maintainCluster(ClusterImpl before, ClusterI after_) {
852             if(after_ != null && after_ != before) {
853                 ClusterImpl after = (ClusterImpl)after_;
854                 if(currentCluster == before) {
855                     currentCluster = after;
856                 }
857                 clusterTable.replaceCluster(after);
858             }
859         }
860
861         public int createResourceKey(int foreignCounter) throws DatabaseException {
862             if(currentCluster == null) {
863                 currentCluster = getNewResourceCluster();
864             }
865             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
866                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
867                 newCluster.foreignLookup = new byte[foreignCounter];
868                 currentCluster = newCluster;
869                 if (DEBUG)
870                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
871             }
872             return currentCluster.createResource(clusterTranslator);
873         }
874
875         @Override
876         public Resource createResource(VirtualGraph provider) throws DatabaseException {
877             if(currentCluster == null) {
878                 if (null != defaultClusterSet) {
879                         ResourceImpl result = getNewResource(defaultClusterSet);
880                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
881                     return result;
882                 } else {
883                     currentCluster = getNewResourceCluster();
884                 }
885             }
886             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
887                 if (null != defaultClusterSet) {
888                         ResourceImpl result = getNewResource(defaultClusterSet);
889                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
890                     return result;
891                 } else {
892                     currentCluster = getNewResourceCluster();
893                 }
894             }
895             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
896         }
897
898         @Override
899         public Resource createResource(VirtualGraph provider, long clusterId)
900         throws DatabaseException {
901             return getNewResource(clusterId);
902         }
903
904         @Override
905         public Resource createResource(VirtualGraph provider, Resource clusterSet)
906         throws DatabaseException {
907             return getNewResource(clusterSet);
908         }
909
910         @Override
911         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
912                 throws DatabaseException {
913             getNewClusterSet(clusterSet);
914         }
915
916         @Override
917         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
918         throws ServiceException {
919             return containsClusterSet(clusterSet);
920         }
921
922         public void selectCluster(long cluster) {
923                 currentCluster = clusterTable.getClusterByClusterId(cluster);
924                 long setResourceId = clusterSetsSupport.getSet(cluster);
925                 clusterSetsSupport.put(setResourceId, cluster);
926         }
927
928         @Override
929         public Resource setDefaultClusterSet(Resource clusterSet)
930         throws ServiceException {
931                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
932                 if(clusterSet != null) {
933                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
934                         currentCluster = clusterTable.getClusterByClusterId(id);
935                         return result;
936                 } else {
937                         currentCluster = null;
938                         return null;
939                 }
940         }
941
942         @Override
943         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
944
945                  provider = getProvider(provider);
946              if (null == provider) {
947                  int key = ((ResourceImpl)resource).id;
948
949
950
951                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
952 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
953 //                      if(key != queryProvider2.getRootLibrary())
954 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
955
956 //                 try {
957 //                     cluster.removeValue(key, clusterTranslator);
958 //                 } catch (DatabaseException e) {
959 //                     Logger.defaultLogError(e);
960 //                     return;
961 //                 }
962
963                  clusterTable.writeOnlyInvalidate(cluster);
964
965                  try {
966                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
967                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
968                          clusterTranslator.removeValue(cluster);
969                  } catch (DatabaseException e) {
970                          Logger.defaultLogError(e);
971                  }
972
973                  queryProvider2.invalidateResource(key);
974                  clientChanges.invalidate(key);
975
976              } else {
977                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
978                  queryProvider2.updateValue(querySupport.getId(resource));
979                  clientChanges.claimValue(resource);
980              }
981
982
983         }
984
985         @Override
986         public void flush(boolean intermediate) {
987             throw new UnsupportedOperationException();
988         }
989
990         @Override
991         public void flushCluster() {
992             clusterTable.flushCluster(graphSession);
993             if(defaultClusterSet != null) {
994                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
995             }
996             currentCluster = null;
997         }
998
999         @Override
1000         public void flushCluster(Resource r) {
1001             throw new UnsupportedOperationException("flushCluster resource " + r);
1002         }
1003
1004         @Override
1005         public void gc() {
1006         }
1007
1008         @Override
1009         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1010                 Resource object) {
1011
1012                 int s = ((ResourceImpl)subject).id;
1013                 int p = ((ResourceImpl)predicate).id;
1014                 int o = ((ResourceImpl)object).id;
1015
1016                 provider = getProvider(provider);
1017                 if (null == provider) {
1018
1019                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1020                         clusterTable.writeOnlyInvalidate(cluster);
1021
1022                         try {
1023
1024                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1025
1026                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1027                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1028
1029                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1030                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1031                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1032                                 clusterTranslator.removeStatement(cluster);
1033
1034                                 queryProvider2.invalidateResource(s);
1035                                 clientChanges.invalidate(s);
1036
1037                         } catch (DatabaseException e) {
1038
1039                                 Logger.defaultLogError(e);
1040
1041                         }
1042
1043                         return true;
1044
1045                 } else {
1046                         
1047                         ((VirtualGraphImpl)provider).deny(s, p, o);
1048                         queryProvider2.invalidateResource(s);
1049                         clientChanges.invalidate(s);
1050
1051                         return true;
1052
1053                 }
1054
1055         }
1056
1057         @Override
1058         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1059             throw new UnsupportedOperationException();
1060         }
1061
1062         @Override
1063         public boolean writeOnly() {
1064             return true;
1065         }
1066
1067         @Override
1068         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1069             writeSupport.performWriteRequest(graph, request);
1070         }
1071
1072         @Override
1073         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1074             throw new UnsupportedOperationException();
1075         }
1076
1077         @Override
1078         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1079             throw new UnsupportedOperationException();
1080         }
1081
1082         @Override
1083         public <T> void addMetadata(Metadata data) throws ServiceException {
1084             writeSupport.addMetadata(data);
1085         }
1086
1087         @Override
1088         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1089             return writeSupport.getMetadata(clazz);
1090         }
1091
1092         @Override
1093         public TreeMap<String, byte[]> getMetadata() {
1094             return writeSupport.getMetadata();
1095         }
1096
1097         @Override
1098         public void commitDone(WriteTraits writeTraits, long csid) {
1099             writeSupport.commitDone(writeTraits, csid);
1100         }
1101
1102         @Override
1103         public void clearUndoList(WriteTraits writeTraits) {
1104             writeSupport.clearUndoList(writeTraits);
1105         }
1106         @Override
1107         public int clearMetadata() {
1108             return writeSupport.clearMetadata();
1109         }
1110
1111                 @Override
1112                 public void startUndo() {
1113                         writeSupport.startUndo();
1114                 }
1115     }
1116
1117     class VirtualWriteOnlySupport implements WriteSupport {
1118
1119 //        @Override
1120 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1121 //                Resource object) {
1122 //            throw new UnsupportedOperationException();
1123 //        }
1124
1125         @Override
1126         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1127
1128             TransientGraph impl = (TransientGraph)provider;
1129             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1130             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1131             clientChanges.claim(subject, predicate, object);
1132
1133         }
1134
1135         @Override
1136         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1137
1138             TransientGraph impl = (TransientGraph)provider;
1139             impl.claim(subject, predicate, object);
1140             getQueryProvider2().updateStatements(subject, predicate);
1141             clientChanges.claim(subject, predicate, object);
1142
1143         }
1144
1145         @Override
1146         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1147             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1148         }
1149
1150         @Override
1151         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1152             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1153             getQueryProvider2().updateValue(resource);
1154             clientChanges.claimValue(resource);
1155         }
1156
1157         @Override
1158         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1159             byte[] value = reader.readBytes(null, amount);
1160             claimValue(provider, resource, value);
1161         }
1162
1163         @Override
1164         public Resource createResource(VirtualGraph provider) {
1165             TransientGraph impl = (TransientGraph)provider;
1166             return impl.getResource(impl.newResource(false));
1167         }
1168
1169         @Override
1170         public Resource createResource(VirtualGraph provider, long clusterId) {
1171             throw new UnsupportedOperationException();
1172         }
1173
1174         @Override
1175         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1176         throws DatabaseException {
1177             throw new UnsupportedOperationException();
1178         }
1179
1180         @Override
1181         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1182         throws DatabaseException {
1183             throw new UnsupportedOperationException();
1184         }
1185
1186         @Override
1187         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1188         throws ServiceException {
1189             throw new UnsupportedOperationException();
1190         }
1191
1192         @Override
1193         public Resource setDefaultClusterSet(Resource clusterSet)
1194         throws ServiceException {
1195                 return null;
1196         }
1197
1198         @Override
1199         public void denyValue(VirtualGraph provider, Resource resource) {
1200             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1201             getQueryProvider2().updateValue(querySupport.getId(resource));
1202             // NOTE: this only keeps track of value changes by-resource.
1203             clientChanges.claimValue(resource);
1204         }
1205
1206         @Override
1207         public void flush(boolean intermediate) {
1208             throw new UnsupportedOperationException();
1209         }
1210
1211         @Override
1212         public void flushCluster() {
1213             throw new UnsupportedOperationException();
1214         }
1215
1216         @Override
1217         public void flushCluster(Resource r) {
1218             throw new UnsupportedOperationException("Resource " + r);
1219         }
1220
1221         @Override
1222         public void gc() {
1223         }
1224
1225         @Override
1226         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1227             TransientGraph impl = (TransientGraph) provider;
1228             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1229             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1230             clientChanges.deny(subject, predicate, object);
1231             return true;
1232         }
1233
1234         @Override
1235         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1236             throw new UnsupportedOperationException();
1237         }
1238
1239         @Override
1240         public boolean writeOnly() {
1241             return true;
1242         }
1243
1244         @Override
1245         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1246             throw new UnsupportedOperationException();
1247         }
1248
1249         @Override
1250         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1251             throw new UnsupportedOperationException();
1252         }
1253
1254         @Override
1255         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1256             throw new UnsupportedOperationException();
1257         }
1258
1259         @Override
1260         public <T> void addMetadata(Metadata data) throws ServiceException {
1261             throw new UnsupportedOperationException();
1262         }
1263
1264         @Override
1265         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1266             throw new UnsupportedOperationException();
1267         }
1268
1269         @Override
1270         public TreeMap<String, byte[]> getMetadata() {
1271             throw new UnsupportedOperationException();
1272         }
1273
1274         @Override
1275         public void commitDone(WriteTraits writeTraits, long csid) {
1276         }
1277
1278         @Override
1279         public void clearUndoList(WriteTraits writeTraits) {
1280         }
1281
1282         @Override
1283         public int clearMetadata() {
1284             return 0;
1285         }
1286
1287                 @Override
1288                 public void startUndo() {
1289                 }
1290     }
1291
1292     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1293
1294         try {
1295
1296             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1297
1298             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1299
1300             flushCounter = 0;
1301             Disposable.safeDispose(clientChanges);
1302             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1303
1304             acquireWriteOnly();
1305
1306             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1307
1308             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1309
1310             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1311             writeState = writeStateT;
1312
1313             assert (null != writer);
1314 //            writer.state.barrier.inc();
1315             long start = System.nanoTime();
1316             T result = request.perform(writer);
1317             long duration = System.nanoTime() - start;
1318             if (DEBUG) {
1319                 System.err.println("################");
1320                 System.err.println("WriteOnly duration " + 1e-9*duration);
1321             }
1322             writeStateT.setResult(result);
1323
1324             // This makes clusters available from server
1325             clusterStream.reallyFlush();
1326
1327             // This will trigger query updates
1328             releaseWriteOnly(writer);
1329             assert (null != writer);
1330
1331             handleUpdatesAndMetadata(writer);
1332
1333         } catch (CancelTransactionException e) {
1334
1335             releaseWriteOnly(writeState.getGraph());
1336
1337             clusterTable.removeWriteOnlyClusters();
1338             state.stopWriteTransaction(clusterStream);
1339
1340         } catch (Throwable e) {
1341
1342             e.printStackTrace();
1343
1344             releaseWriteOnly(writeState.getGraph());
1345
1346             clusterTable.removeWriteOnlyClusters();
1347
1348             if (callback != null)
1349                 callback.exception(new DatabaseException(e));
1350
1351             state.stopWriteTransaction(clusterStream);
1352             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1353
1354         } finally  {
1355
1356             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1357
1358         }
1359
1360     }
1361
1362     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1363         scheduleRequest(request, callback, notify, null);
1364     }
1365
1366     @Override
1367     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1368
1369         assertAlive();
1370
1371         assert (request != null);
1372
1373         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1374
1375         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1376
1377             @Override
1378             public void run(int thread) {
1379
1380                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1381
1382                     try {
1383
1384                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1385
1386                         flushCounter = 0;
1387                         Disposable.safeDispose(clientChanges);
1388                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1389
1390                         acquireWriteOnly();
1391
1392                         VirtualGraph vg = getProvider(request.getProvider());
1393                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1394
1395                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1396
1397                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1398
1399                             @Override
1400                             public void execute(Object result) {
1401                                 if(callback != null) callback.accept(null);
1402                             }
1403
1404                             @Override
1405                             public void exception(Throwable t) {
1406                                 if(callback != null) callback.accept((DatabaseException)t);
1407                             }
1408
1409                         });
1410
1411                         assert (null != writer);
1412 //                        writer.state.barrier.inc();
1413
1414                         try {
1415
1416                             request.perform(writer);
1417
1418                         } catch (Throwable e) {
1419
1420 //                            writer.state.barrier.dec();
1421 //                            writer.waitAsync(null);
1422
1423                             releaseWriteOnly(writer);
1424
1425                             clusterTable.removeWriteOnlyClusters();
1426
1427                             if(!(e instanceof CancelTransactionException)) {
1428                                 if (callback != null)
1429                                     callback.accept(new DatabaseException(e));
1430                             }
1431
1432                             writeState.except(e);
1433
1434                             return;
1435
1436                         }
1437
1438                         // This makes clusters available from server
1439                         boolean empty = clusterStream.reallyFlush();
1440                         // This was needed to make WO requests call metadata listeners.
1441                         // NOTE: the calling event does not contain clientChanges information.
1442                         if (!empty && clientChanges.isEmpty())
1443                             clientChanges.setNotEmpty(true);
1444                         releaseWriteOnly(writer);
1445                         assert (null != writer);
1446                     } finally  {
1447
1448                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1449
1450                     }
1451
1452
1453                 task.finish();
1454
1455             }
1456
1457         }, combine);
1458
1459     }
1460
1461     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1462         scheduleRequest(request, callback, notify, null);
1463     }
1464
1465     /* (non-Javadoc)
1466      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1467      */
1468     @Override
1469     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1470
1471         assert (request != null);
1472
1473         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1474
1475         requestManager.scheduleWrite(new SessionTask(request, thread) {
1476
1477             @Override
1478             public void run(int thread) {
1479
1480                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1481
1482                 performWriteOnly(request, notify, callback);
1483
1484                 task.finish();
1485
1486             }
1487
1488         }, combine);
1489
1490     }
1491
1492     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1493
1494         assert (request != null);
1495         assert (procedure != null);
1496
1497         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1498
1499         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1500
1501             @Override
1502             public void run(int thread) {
1503
1504                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1505
1506                 ListenerBase listener = getListenerBase(procedure);
1507
1508                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1509
1510                 try {
1511
1512                     if (listener != null) {
1513
1514                         try {
1515                                 newGraph.processor.queryRead(newGraph, request, null, new AsyncProcedure<T>() {
1516
1517                                         @Override
1518                                         public void exception(AsyncReadGraph graph, Throwable t) {
1519                                                 procedure.exception(graph, t);
1520                                                 if(throwable != null) {
1521                                                         throwable.set(t);
1522                                                 } else {
1523                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1524                                                 }
1525                                         }
1526
1527                                         @Override
1528                                         public void execute(AsyncReadGraph graph, T t) {
1529                                                 if(result != null) result.set(t);
1530                                                 procedure.execute(graph, t);
1531                                         }
1532
1533                                 }, listener);
1534                         } catch (Throwable t) {
1535                             // This is handled by the AsyncProcedure
1536                                 //Logger.defaultLogError("Internal error", t);
1537                         }
1538
1539                     } else {
1540
1541                         try {
1542
1543 //                            newGraph.state.barrier.inc();
1544
1545                             T t = request.perform(newGraph);
1546
1547                             try {
1548
1549                                 if(result != null) result.set(t);
1550                                 procedure.execute(newGraph, t);
1551
1552                             } catch (Throwable th) {
1553
1554                                 if(throwable != null) {
1555                                     throwable.set(th);
1556                                 } else {
1557                                     Logger.defaultLogError("Unhandled exception", th);
1558                                 }
1559
1560                             }
1561
1562                         } catch (Throwable t) {
1563
1564                             if (DEBUG)
1565                                 t.printStackTrace();
1566
1567                             if(throwable != null) {
1568                                 throwable.set(t);
1569                             } else {
1570                                 Logger.defaultLogError("Unhandled exception", t);
1571                             }
1572
1573                             try {
1574
1575                                 procedure.exception(newGraph, t);
1576
1577                             } catch (Throwable t2) {
1578
1579                                 if(throwable != null) {
1580                                     throwable.set(t2);
1581                                 } else {
1582                                     Logger.defaultLogError("Unhandled exception", t2);
1583                                 }
1584
1585                             }
1586
1587                         }
1588
1589 //                        newGraph.state.barrier.dec();
1590 //                        newGraph.waitAsync(request);
1591
1592                     }
1593
1594                 } finally {
1595
1596                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1597
1598                 }
1599
1600             }
1601
1602         });
1603
1604     }
1605
1606     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1607
1608         assert (request != null);
1609         assert (procedure != null);
1610
1611         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1612
1613         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1614
1615             @Override
1616             public void run(int thread) {
1617
1618                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1619
1620                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1621
1622                 try {
1623
1624                     if (listener != null) {
1625
1626                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1627
1628 //                        newGraph.waitAsync(request);
1629
1630                     } else {
1631
1632                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1633                                 procedure, "request");
1634
1635                         try {
1636
1637 //                            newGraph.state.barrier.inc();
1638
1639                             request.perform(newGraph, wrapper);
1640
1641 //                            newGraph.waitAsync(request);
1642
1643                         } catch (Throwable t) {
1644
1645                             wrapper.exception(newGraph, t);
1646 //                            newGraph.waitAsync(request);
1647
1648
1649                         }
1650
1651                     }
1652
1653                 } finally {
1654
1655                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1656
1657                 }
1658
1659             }
1660
1661         });
1662
1663     }
1664
1665     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1666
1667         assert (request != null);
1668         assert (procedure != null);
1669
1670         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1671
1672         int sync = notify != null ? thread : -1;
1673
1674         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1675
1676             @Override
1677             public void run(int thread) {
1678
1679                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1680
1681                 ListenerBase listener = getListenerBase(procedure);
1682
1683                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1684
1685                 try {
1686
1687                     if (listener != null) {
1688
1689                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1690
1691 //                        newGraph.waitAsync(request);
1692
1693                     } else {
1694
1695                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1696
1697                         try {
1698
1699                             request.perform(newGraph, wrapper);
1700
1701                         } catch (Throwable t) {
1702
1703                             t.printStackTrace();
1704
1705                         }
1706
1707                     }
1708
1709                 } finally {
1710
1711                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1712
1713                 }
1714
1715             }
1716
1717         });
1718
1719     }
1720
1721     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1722
1723         assert (request != null);
1724         assert (procedure != null);
1725
1726         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1727
1728         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1729
1730             @Override
1731             public void run(int thread) {
1732
1733                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1734
1735                 ListenerBase listener = getListenerBase(procedure);
1736
1737                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1738
1739                 try {
1740
1741                     if (listener != null) {
1742
1743                         newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1744
1745                             @Override
1746                             public void exception(Throwable t) {
1747                                 procedure.exception(t);
1748                                 if(throwable != null) {
1749                                     throwable.set(t);
1750                                 }
1751                             }
1752
1753                             @Override
1754                             public void execute(T t) {
1755                                 if(result != null) result.set(t);
1756                                 procedure.execute(t);
1757                             }
1758
1759                         }, listener);
1760
1761 //                        newGraph.waitAsync(request);
1762
1763                     } else {
1764
1765 //                        newGraph.state.barrier.inc();
1766
1767                         request.register(newGraph, new Listener<T>() {
1768
1769                             @Override
1770                             public void exception(Throwable t) {
1771                                 if(throwable != null) throwable.set(t);
1772                                 procedure.exception(t);
1773 //                                newGraph.state.barrier.dec();
1774                             }
1775
1776                             @Override
1777                             public void execute(T t) {
1778                                 if(result != null) result.set(t);
1779                                 procedure.execute(t);
1780 //                                newGraph.state.barrier.dec();
1781                             }
1782
1783                             @Override
1784                             public boolean isDisposed() {
1785                                 return true;
1786                             }
1787
1788                         });
1789
1790 //                        newGraph.waitAsync(request);
1791
1792                     }
1793
1794                 } finally {
1795
1796                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1797
1798                 }
1799
1800             }
1801
1802         });
1803
1804     }
1805
1806
1807     @Override
1808     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1809
1810         scheduleRequest(request, procedure, null, null, null);
1811
1812     }
1813
1814     @Override
1815     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1816         assertNotSession();
1817         Semaphore notify = new Semaphore(0);
1818         DataContainer<Throwable> container = new DataContainer<Throwable>();
1819         DataContainer<T> result = new DataContainer<T>();
1820         scheduleRequest(request, procedure, notify, container, result);
1821         acquire(notify, request);
1822         Throwable throwable = container.get();
1823         if(throwable != null) {
1824             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1825             else throw new DatabaseException("Unexpected exception", throwable);
1826         }
1827         return result.get();
1828     }
1829
1830     @Override
1831     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1832         assertNotSession();
1833         Semaphore notify = new Semaphore(0);
1834         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1835         final DataContainer<T> resultContainer = new DataContainer<T>();
1836         scheduleRequest(request, new AsyncProcedure<T>() {
1837             @Override
1838             public void exception(AsyncReadGraph graph, Throwable throwable) {
1839                 exceptionContainer.set(throwable);
1840                 procedure.exception(graph, throwable);
1841             }
1842             @Override
1843             public void execute(AsyncReadGraph graph, T result) {
1844                 resultContainer.set(result);
1845                 procedure.execute(graph, result);
1846             }
1847         }, getListenerBase(procedure), notify);
1848         acquire(notify, request, procedure);
1849         Throwable throwable = exceptionContainer.get();
1850         if (throwable != null) {
1851             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1852             else throw new DatabaseException("Unexpected exception", throwable);
1853         }
1854         return resultContainer.get();
1855     }
1856
1857
1858     @Override
1859     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1860         assertNotSession();
1861         Semaphore notify = new Semaphore(0);
1862         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1863         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1864             @Override
1865             public void exception(AsyncReadGraph graph, Throwable throwable) {
1866                 exceptionContainer.set(throwable);
1867                 procedure.exception(graph, throwable);
1868             }
1869             @Override
1870             public void execute(AsyncReadGraph graph, T result) {
1871                 procedure.execute(graph, result);
1872             }
1873             @Override
1874             public void finished(AsyncReadGraph graph) {
1875                 procedure.finished(graph);
1876             }
1877         }, notify);
1878         acquire(notify, request, procedure);
1879         Throwable throwable = exceptionContainer.get();
1880         if (throwable != null) {
1881             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1882             else throw new DatabaseException("Unexpected exception", throwable);
1883         }
1884         // TODO: implement return value
1885         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1886         return null;
1887     }
1888
1889     @Override
1890     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1891         return syncRequest(request, new ProcedureAdapter<T>());
1892
1893
1894 //        assert(request != null);
1895 //
1896 //        final DataContainer<T> result = new DataContainer<T>();
1897 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1898 //
1899 //        syncRequest(request, new Procedure<T>() {
1900 //
1901 //            @Override
1902 //            public void execute(T t) {
1903 //                result.set(t);
1904 //            }
1905 //
1906 //            @Override
1907 //            public void exception(Throwable t) {
1908 //                exception.set(t);
1909 //            }
1910 //
1911 //        });
1912 //
1913 //        Throwable t = exception.get();
1914 //        if(t != null) {
1915 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1916 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1917 //        }
1918 //
1919 //        return result.get();
1920
1921     }
1922
1923     @Override
1924     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1925         return syncRequest(request, (Procedure<T>)procedure);
1926     }
1927
1928
1929 //    @Override
1930 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1931 //        assertNotSession();
1932 //        Semaphore notify = new Semaphore(0);
1933 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1934 //        DataContainer<T> result = new DataContainer<T>();
1935 //        scheduleRequest(request, procedure, notify, container, result);
1936 //        acquire(notify, request);
1937 //        Throwable throwable = container.get();
1938 //        if(throwable != null) {
1939 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1940 //            else throw new DatabaseException("Unexpected exception", throwable);
1941 //        }
1942 //        return result.get();
1943 //    }
1944
1945
1946     @Override
1947     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1948         assertNotSession();
1949         Semaphore notify = new Semaphore(0);
1950         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1951         final DataContainer<T> result = new DataContainer<T>();
1952         scheduleRequest(request, procedure, notify, container, result);
1953         acquire(notify, request);
1954         Throwable throwable = container.get();
1955         if (throwable != null) {
1956             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1957             else throw new DatabaseException("Unexpected exception", throwable);
1958         }
1959         return result.get();
1960     }
1961
1962     @Override
1963     public void syncRequest(Write request) throws DatabaseException  {
1964         assertNotSession();
1965         assertAlive();
1966         Semaphore notify = new Semaphore(0);
1967         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1968         scheduleRequest(request, e -> exception.set(e), notify);
1969         acquire(notify, request);
1970         if(exception.get() != null) throw exception.get();
1971     }
1972
1973     @Override
1974     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1975         assertNotSession();
1976         Semaphore notify = new Semaphore(0);
1977         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1978         final DataContainer<T> result = new DataContainer<T>();
1979         scheduleRequest(request, new Procedure<T>() {
1980
1981             @Override
1982             public void exception(Throwable t) {
1983               exception.set(t);
1984             }
1985
1986             @Override
1987             public void execute(T t) {
1988               result.set(t);
1989             }
1990
1991         }, notify);
1992         acquire(notify, request);
1993         if(exception.get() != null) {
1994             Throwable t = exception.get();
1995             if(t instanceof DatabaseException) throw (DatabaseException)t;
1996             else throw new DatabaseException(t);
1997         }
1998         return result.get();
1999     }
2000
2001     @Override
2002     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2003         assertNotSession();
2004         Semaphore notify = new Semaphore(0);
2005         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2006         final DataContainer<T> result = new DataContainer<T>();
2007         scheduleRequest(request, new Procedure<T>() {
2008
2009             @Override
2010             public void exception(Throwable t) {
2011               exception.set(t);
2012             }
2013
2014             @Override
2015             public void execute(T t) {
2016               result.set(t);
2017             }
2018
2019         }, notify);
2020         acquire(notify, request);
2021         if(exception.get() != null) {
2022             Throwable t = exception.get();
2023             if(t instanceof DatabaseException) throw (DatabaseException)t;
2024             else throw new DatabaseException(t);
2025         }
2026         return result.get();
2027     }
2028
2029     @Override
2030     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2031         assertNotSession();
2032         Semaphore notify = new Semaphore(0);
2033         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2034         final DataContainer<T> result = new DataContainer<T>();
2035         scheduleRequest(request, new Procedure<T>() {
2036
2037             @Override
2038             public void exception(Throwable t) {
2039               exception.set(t);
2040             }
2041
2042             @Override
2043             public void execute(T t) {
2044               result.set(t);
2045             }
2046
2047         }, notify);
2048         acquire(notify, request);
2049         if(exception.get() != null) {
2050             Throwable t = exception.get();
2051             if(t instanceof DatabaseException) throw (DatabaseException)t;
2052             else throw new DatabaseException(t);
2053         }
2054         return result.get();
2055     }
2056
2057     @Override
2058     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2059         assertNotSession();
2060         Semaphore notify = new Semaphore(0);
2061         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2062         scheduleRequest(request, e -> exception.set(e), notify);
2063         acquire(notify, request);
2064         if(exception.get() != null) throw exception.get();
2065     }
2066
2067     @Override
2068     public void syncRequest(WriteOnly request) throws DatabaseException  {
2069         assertNotSession();
2070         assertAlive();
2071         Semaphore notify = new Semaphore(0);
2072         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2073         scheduleRequest(request, e -> exception.set(e), notify);
2074         acquire(notify, request);
2075         if(exception.get() != null) throw exception.get();
2076     }
2077
2078     /*
2079      *
2080      * GraphRequestProcessor interface
2081      */
2082
2083     @Override
2084     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2085
2086         scheduleRequest(request, procedure, null, null);
2087
2088     }
2089
2090     @Override
2091     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2092
2093         scheduleRequest(request, procedure, null);
2094
2095     }
2096
2097     @Override
2098     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2099
2100         scheduleRequest(request, procedure, null, null, null);
2101
2102     }
2103
2104     @Override
2105     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2106
2107         scheduleRequest(request, callback, null);
2108
2109     }
2110
2111     @Override
2112     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2113
2114         scheduleRequest(request, procedure, null);
2115
2116     }
2117
2118     @Override
2119     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2120
2121         scheduleRequest(request, procedure, null);
2122
2123     }
2124
2125     @Override
2126     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2127
2128         scheduleRequest(request, procedure, null);
2129
2130     }
2131
2132     @Override
2133     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2134
2135         scheduleRequest(request, callback, null);
2136
2137     }
2138
2139     @Override
2140     public void asyncRequest(final Write r) {
2141         asyncRequest(r, null);
2142     }
2143
2144     @Override
2145     public void asyncRequest(final DelayedWrite r) {
2146         asyncRequest(r, null);
2147     }
2148
2149     @Override
2150     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2151
2152         scheduleRequest(request, callback, null);
2153
2154     }
2155
2156     @Override
2157     public void asyncRequest(final WriteOnly request) {
2158
2159         asyncRequest(request, null);
2160
2161     }
2162
2163     @Override
2164     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2165         r.request(this, procedure);
2166     }
2167
2168     @Override
2169     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2170         r.request(this, procedure);
2171     }
2172
2173     @Override
2174     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2175         r.request(this, procedure);
2176     }
2177
2178     @Override
2179     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2180         r.request(this, procedure);
2181     }
2182
2183     @Override
2184     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2185         r.request(this, procedure);
2186     }
2187
2188     @Override
2189     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2190         r.request(this, procedure);
2191     }
2192
2193     @Override
2194     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2195         return r.request(this);
2196     }
2197
2198     @Override
2199     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2200         return r.request(this);
2201     }
2202
2203     @Override
2204     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2205         r.request(this, procedure);
2206     }
2207
2208     @Override
2209     public <T> void async(WriteInterface<T> r) {
2210         r.request(this, new ProcedureAdapter<T>());
2211     }
2212
2213     //@Override
2214     public void incAsync() {
2215         state.incAsync();
2216     }
2217
2218     //@Override
2219     public void decAsync() {
2220         state.decAsync();
2221     }
2222
2223     public long getCluster(ResourceImpl resource) {
2224         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2225         return cluster.getClusterId();
2226     }
2227
2228     public long getCluster(int id) {
2229         if (clusterTable == null)
2230             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2231         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2232     }
2233
2234     public ResourceImpl getResource(int id) {
2235         return new ResourceImpl(resourceSupport, id);
2236     }
2237
2238     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2239         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2240         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2241         int key = proxy.getClusterKey();
2242         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2243         return new ResourceImpl(resourceSupport, resourceKey);
2244     }
2245
2246     public ResourceImpl getResource2(int id) {
2247         assert (id != 0);
2248         return new ResourceImpl(resourceSupport, id);
2249     }
2250
2251     final public int getId(ResourceImpl impl) {
2252         return impl.id;
2253     }
2254
2255     public static final Charset UTF8 = Charset.forName("utf-8");
2256
2257
2258
2259
2260     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2261         for(TransientGraph g : support.providers) {
2262             if(g.isPending(subject)) return false;
2263         }
2264         return true;
2265     }
2266
2267     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2268         for(TransientGraph g : support.providers) {
2269             if(g.isPending(subject, predicate)) return false;
2270         }
2271         return true;
2272     }
2273
2274     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2275
2276         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2277
2278             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2279
2280             @Override
2281             public void accept(ReadGraphImpl graph) {
2282                 if(ready.decrementAndGet() == 0) {
2283                     runnable.accept(graph);
2284                 }
2285             }
2286
2287         };
2288
2289         for(TransientGraph g : support.providers) {
2290             if(g.isPending(subject)) {
2291                 try {
2292                     g.load(graph, subject, composite);
2293                 } catch (DatabaseException e) {
2294                     e.printStackTrace();
2295                 }
2296             } else {
2297                 composite.accept(graph);
2298             }
2299         }
2300
2301         composite.accept(graph);
2302
2303     }
2304
2305     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2306
2307         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2308
2309             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2310
2311             @Override
2312             public void accept(ReadGraphImpl graph) {
2313                 if(ready.decrementAndGet() == 0) {
2314                     runnable.accept(graph);
2315                 }
2316             }
2317
2318         };
2319
2320         for(TransientGraph g : support.providers) {
2321             if(g.isPending(subject, predicate)) {
2322                 try {
2323                     g.load(graph, subject, predicate, composite);
2324                 } catch (DatabaseException e) {
2325                     e.printStackTrace();
2326                 }
2327             } else {
2328                 composite.accept(graph);
2329             }
2330         }
2331
2332         composite.accept(graph);
2333
2334     }
2335
2336 //    void dumpHeap() {
2337 //
2338 //        try {
2339 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2340 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2341 //                    "d:/heap" + flushCounter + ".txt", true);
2342 //        } catch (IOException e) {
2343 //            e.printStackTrace();
2344 //        }
2345 //
2346 //    }
2347
2348     void fireReactionsToSynchronize(ChangeSet cs) {
2349
2350         // Do not fire empty events
2351         if (cs.isEmpty())
2352             return;
2353
2354         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2355 //        g.state.barrier.inc();
2356
2357         try {
2358
2359             if (!cs.isEmpty()) {
2360                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2361                 for (ChangeListener l : changeListeners2) {
2362                     try {
2363                         l.graphChanged(e2);
2364                     } catch (Exception ex) {
2365                         ex.printStackTrace();
2366                     }
2367                 }
2368             }
2369
2370         } finally {
2371
2372 //            g.state.barrier.dec();
2373 //            g.waitAsync(null);
2374
2375         }
2376
2377     }
2378
2379     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2380         try {
2381
2382             // Do not fire empty events
2383             if (cs2.isEmpty())
2384                 return;
2385
2386 //            graph.restart();
2387 //            graph.state.barrier.inc();
2388
2389             try {
2390
2391                 if (!cs2.isEmpty()) {
2392
2393                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2394                     for (ChangeListener l : changeListeners2) {
2395                         try {
2396                             l.graphChanged(e2);
2397 //                            System.out.println("changelistener " + l);
2398                         } catch (Exception ex) {
2399                             ex.printStackTrace();
2400                         }
2401                     }
2402
2403                 }
2404
2405             } finally {
2406
2407 //                graph.state.barrier.dec();
2408 //                graph.waitAsync(null);
2409
2410             }
2411         } catch (Throwable t) {
2412             t.printStackTrace();
2413
2414         }
2415     }
2416     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2417
2418         try {
2419
2420             // Do not fire empty events
2421             if (cs2.isEmpty())
2422                 return;
2423
2424             // Do not fire on virtual requests
2425             if(graph.getProvider() != null)
2426                 return;
2427
2428             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2429
2430             try {
2431
2432                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2433                 for (ChangeListener l : metadataListeners) {
2434                     try {
2435                         l.graphChanged(e2);
2436                     } catch (Throwable ex) {
2437                         ex.printStackTrace();
2438                     }
2439                 }
2440
2441             } finally {
2442
2443             }
2444
2445         } catch (Throwable t) {
2446             t.printStackTrace();
2447         }
2448
2449     }
2450
2451
2452     /*
2453      * (non-Javadoc)
2454      *
2455      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2456      */
2457     @Override
2458     public <T> T getService(Class<T> api) {
2459         T t = peekService(api);
2460         if (t == null) {
2461             if (state.isClosed())
2462                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2463             throw new ServiceNotFoundException(this, api);
2464         }
2465         return t;
2466     }
2467
2468     /**
2469      * @return
2470      */
2471     protected abstract ServerInformation getCachedServerInformation();
2472
2473         private Class<?> serviceKey1 = null;
2474         private Class<?> serviceKey2 = null;
2475         private Object service1 = null;
2476         private Object service2 = null;
2477
2478     /*
2479      * (non-Javadoc)
2480      *
2481      * @see
2482      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2483      */
2484     @SuppressWarnings("unchecked")
2485     @Override
2486     public synchronized <T> T peekService(Class<T> api) {
2487
2488                 if(serviceKey1 == api) {
2489                         return (T)service1;
2490                 } else if (serviceKey2 == api) {
2491                         // Promote this key
2492                         Object result = service2;
2493                         service2 = service1;
2494                         serviceKey2 = serviceKey1;
2495                         service1 = result;
2496                         serviceKey1 = api;
2497                         return (T)result;
2498                 }
2499
2500         if (Layer0.class == api)
2501                 return (T) L0;
2502         if (ServerInformation.class == api)
2503             return (T) getCachedServerInformation();
2504         else if (WriteGraphImpl.class == api)
2505             return (T) writeState.getGraph();
2506         else if (ClusterBuilder.class == api)
2507             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2508         else if (ClusterBuilderFactory.class == api)
2509             return (T)new ClusterBuilderFactoryImpl(this);
2510
2511                 service2 = service1;
2512                 serviceKey2 = serviceKey1;
2513
2514         service1 = serviceLocator.peekService(api);
2515         serviceKey1 = api;
2516
2517         return (T)service1;
2518
2519     }
2520
2521     /*
2522      * (non-Javadoc)
2523      *
2524      * @see
2525      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2526      */
2527     @Override
2528     public boolean hasService(Class<?> api) {
2529         return serviceLocator.hasService(api);
2530     }
2531
2532     /**
2533      * @param api the api that must be implemented by the specified service
2534      * @param service the service implementation
2535      */
2536     @Override
2537     public <T> void registerService(Class<T> api, T service) {
2538         if(Layer0.class == api) {
2539                 L0 = (Layer0)service;
2540                 return;
2541         }
2542         serviceLocator.registerService(api, service);
2543         if (TransactionPolicySupport.class == api) {
2544             transactionPolicy = (TransactionPolicySupport)service;
2545             state.resetTransactionPolicy();
2546         }
2547         if (api == serviceKey1)
2548                 service1 = service;
2549         else if (api == serviceKey2)
2550                 service2 = service;
2551     }
2552
2553     // ----------------
2554     // SessionMonitor
2555     // ----------------
2556
2557     void fireSessionVariableChange(String variable) {
2558         for (MonitorHandler h : monitorHandlers) {
2559             MonitorContext ctx = monitorContexts.getLeft(h);
2560             assert ctx != null;
2561             // SafeRunner functionality repeated here to avoid dependency.
2562             try {
2563                 h.valuesChanged(ctx);
2564             } catch (Exception e) {
2565                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2566             } catch (LinkageError e) {
2567                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2568             }
2569         }
2570     }
2571
2572
2573     class ResourceSerializerImpl implements ResourceSerializer {
2574
2575         public long createRandomAccessId(int id) throws DatabaseException {
2576             if(id < 0)
2577                 return id;
2578             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2579             long cluster = getCluster(id);
2580             if (0 == cluster)
2581                 return 0; // Better to return 0 then invalid id.
2582             long result = ClusterTraitsBase.createResourceId(cluster, index);
2583             return result;
2584         }
2585
2586         @Override
2587         public long getRandomAccessId(Resource resource) throws DatabaseException {
2588
2589             ResourceImpl resourceImpl = (ResourceImpl) resource;
2590             return createRandomAccessId(resourceImpl.id);
2591
2592         }
2593
2594         @Override
2595         public int getTransientId(Resource resource) throws DatabaseException {
2596
2597             ResourceImpl resourceImpl = (ResourceImpl) resource;
2598             return resourceImpl.id;
2599
2600         }
2601
2602         @Override
2603         public String createRandomAccessId(Resource resource)
2604         throws InvalidResourceReferenceException {
2605
2606             if(resource == null) throw new IllegalArgumentException();
2607
2608             ResourceImpl resourceImpl = (ResourceImpl) resource;
2609
2610             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2611
2612             int r;
2613             try {
2614                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2615             } catch (DatabaseException e1) {
2616                 throw new InvalidResourceReferenceException(e1);
2617             }
2618             try {
2619                 // Serialize as '<resource index>_<cluster id>'
2620                 return "" + r + "_" + getCluster(resourceImpl);
2621             } catch (Throwable e) {
2622                 e.printStackTrace();
2623                 throw new InvalidResourceReferenceException(e);
2624             } finally {
2625             }
2626         }
2627
2628         public int getTransientId(long serialized) throws DatabaseException {
2629             if (serialized <= 0)
2630                 return (int)serialized;
2631             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2632             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2633             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2634             if (cluster == null)
2635                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2636             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2637             return key;
2638         }
2639
2640         @Override
2641         public Resource getResource(long randomAccessId) throws DatabaseException {
2642             return getResourceByKey(getTransientId(randomAccessId));
2643         }
2644
2645         @Override
2646         public Resource getResource(int transientId) throws DatabaseException {
2647             return getResourceByKey(transientId);
2648         }
2649
2650         @Override
2651         public Resource getResource(String randomAccessId)
2652         throws InvalidResourceReferenceException {
2653             try {
2654                 int i = randomAccessId.indexOf('_');
2655                 if (i == -1)
2656                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2657                             + randomAccessId + "'");
2658                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2659                 if(r < 0) return getResourceByKey(r);
2660                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2661                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2662                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2663                 if (cluster.hasResource(key, clusterTranslator))
2664                     return getResourceByKey(key);
2665             } catch (InvalidResourceReferenceException e) {
2666                 throw e;
2667             } catch (NumberFormatException e) {
2668                 throw new InvalidResourceReferenceException(e);
2669             } catch (Throwable e) {
2670                 e.printStackTrace();
2671                 throw new InvalidResourceReferenceException(e);
2672             } finally {
2673             }
2674             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2675         }
2676
2677         @Override
2678         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2679             try {
2680                 return true;
2681             } catch (Throwable e) {
2682                 e.printStackTrace();
2683                 throw new InvalidResourceReferenceException(e);
2684             } finally {
2685             }
2686         }
2687     }
2688
2689     // Copied from old SessionImpl
2690
2691     void check() {
2692         if (state.isClosed())
2693             throw new Error("Session closed.");
2694     }
2695
2696
2697
2698     public ResourceImpl getNewResource(long clusterId) {
2699         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2700         int newId;
2701         try {
2702             newId = cluster.createResource(clusterTranslator);
2703         } catch (DatabaseException e) {
2704             Logger.defaultLogError(e);
2705             return null;
2706         }
2707         return new ResourceImpl(resourceSupport, newId);
2708     }
2709
2710     public ResourceImpl getNewResource(Resource clusterSet)
2711     throws DatabaseException {
2712         long resourceId = clusterSet.getResourceId();
2713         Long clusterId = clusterSetsSupport.get(resourceId);
2714         if (null == clusterId)
2715             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2716         if (Constants.NewClusterId == clusterId) {
2717             clusterId = getService(ClusteringSupport.class).createCluster();
2718             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2719                 createdClusters.add(clusterId);
2720             clusterSetsSupport.put(resourceId, clusterId);
2721             return getNewResource(clusterId);
2722         } else {
2723             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2724             ResourceImpl result;
2725             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2726                 clusterId = getService(ClusteringSupport.class).createCluster();
2727                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2728                     createdClusters.add(clusterId);
2729                 clusterSetsSupport.put(resourceId, clusterId);
2730                 return getNewResource(clusterId);
2731             } else {
2732                 result = getNewResource(clusterId);
2733                 int resultKey = querySupport.getId(result);
2734                 long resultCluster = querySupport.getClusterId(resultKey);
2735                 if (clusterId != resultCluster)
2736                     clusterSetsSupport.put(resourceId, resultCluster);
2737                 return result;
2738             }
2739         }
2740     }
2741
2742     public void getNewClusterSet(Resource clusterSet)
2743     throws DatabaseException {
2744         if (DEBUG)
2745             System.out.println("new cluster set=" + clusterSet);
2746         long resourceId = clusterSet.getResourceId();
2747         if (clusterSetsSupport.containsKey(resourceId))
2748             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2749         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2750     }
2751     public boolean containsClusterSet(Resource clusterSet)
2752     throws ServiceException {
2753         long resourceId = clusterSet.getResourceId();
2754         return clusterSetsSupport.containsKey(resourceId);
2755     }
2756     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2757         Resource r = defaultClusterSet;
2758         defaultClusterSet = clusterSet;
2759         return r;
2760     }
2761     void printDiagnostics() {
2762
2763         if (DIAGNOSTICS) {
2764
2765             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2766             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2767             for(ClusterI cluster : clusterTable.getClusters()) {
2768                 try {
2769                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2770                         residentClusters.set(residentClusters.get() + 1);
2771                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2772                     }
2773                 } catch (DatabaseException e) {
2774                     Logger.defaultLogError(e);
2775                 }
2776             }
2777
2778             System.out.println("--------------------------------");
2779             System.out.println("Cluster information:");
2780             System.out.println("-amount of resident clusters=" + residentClusters.get());
2781             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2782
2783             for(ClusterI cluster : clusterTable.getClusters()) {
2784                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2785                 try {
2786                     if (!cluster.isLoaded())
2787                         System.out.println("not loaded]");
2788                     else if (cluster.isEmpty())
2789                         System.out.println("is empty]");
2790                     else {
2791                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2792                         System.out.print(",references=" + cluster.getReferenceCount());
2793                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2794                     }
2795                 } catch (DatabaseException e) {
2796                     Logger.defaultLogError(e);
2797                     System.out.println("is corrupted]");
2798                 }
2799             }
2800
2801             queryProvider2.printDiagnostics();
2802             System.out.println("--------------------------------");
2803         }
2804
2805     }
2806
2807     public void fireStartReadTransaction() {
2808
2809         if (DIAGNOSTICS)
2810             System.out.println("StartReadTransaction");
2811
2812         for (SessionEventListener listener : eventListeners) {
2813             try {
2814                 listener.readTransactionStarted();
2815             } catch (Throwable t) {
2816                 t.printStackTrace();
2817             }
2818         }
2819
2820     }
2821
2822     public void fireFinishReadTransaction() {
2823
2824         if (DIAGNOSTICS)
2825             System.out.println("FinishReadTransaction");
2826
2827         for (SessionEventListener listener : eventListeners) {
2828             try {
2829                 listener.readTransactionFinished();
2830             } catch (Throwable t) {
2831                 t.printStackTrace();
2832             }
2833         }
2834
2835     }
2836
2837     public void fireStartWriteTransaction() {
2838
2839         if (DIAGNOSTICS)
2840             System.out.println("StartWriteTransaction");
2841
2842         for (SessionEventListener listener : eventListeners) {
2843             try {
2844                 listener.writeTransactionStarted();
2845             } catch (Throwable t) {
2846                 t.printStackTrace();
2847             }
2848         }
2849
2850     }
2851
2852     public void fireFinishWriteTransaction() {
2853
2854         if (DIAGNOSTICS)
2855             System.out.println("FinishWriteTransaction");
2856
2857         for (SessionEventListener listener : eventListeners) {
2858             try {
2859                 listener.writeTransactionFinished();
2860             } catch (Throwable t) {
2861                 t.printStackTrace();
2862             }
2863         }
2864
2865         Indexing.resetDependenciesIndexingDisabled();
2866
2867     }
2868
2869     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2870         throw new Error("Not supported at the moment.");
2871     }
2872
2873     State getState() {
2874         return state;
2875     }
2876
2877     ClusterTable getClusterTable() {
2878         return clusterTable;
2879     }
2880
2881     public GraphSession getGraphSession() {
2882         return graphSession;
2883     }
2884
2885     public QueryProcessor getQueryProvider2() {
2886         return queryProvider2;
2887     }
2888
2889     ClientChangesImpl getClientChanges() {
2890         return clientChanges;
2891     }
2892
2893     boolean getWriteOnly() {
2894         return writeOnly;
2895     }
2896
2897     static int counter = 0;
2898
2899     public void onClusterLoaded(long clusterId) {
2900
2901         clusterTable.updateSize();
2902
2903     }
2904
2905
2906
2907
2908     /*
2909      * Implementation of the interface RequestProcessor
2910      */
2911
2912     @Override
2913     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2914
2915         assertNotSession();
2916         assertAlive();
2917
2918         assert(request != null);
2919
2920         final DataContainer<T> result = new DataContainer<T>();
2921         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2922
2923         syncRequest(request, new AsyncProcedure<T>() {
2924
2925             @Override
2926             public void execute(AsyncReadGraph graph, T t) {
2927                 result.set(t);
2928             }
2929
2930             @Override
2931             public void exception(AsyncReadGraph graph, Throwable t) {
2932                 exception.set(t);
2933             }
2934
2935         });
2936
2937         Throwable t = exception.get();
2938         if(t != null) {
2939             if(t instanceof DatabaseException) throw (DatabaseException)t;
2940             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2941         }
2942
2943         return result.get();
2944
2945     }
2946
2947 //    @Override
2948 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2949 //        assertNotSession();
2950 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2951 //    }
2952
2953     @Override
2954     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2955         assertNotSession();
2956         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2957     }
2958
2959     @Override
2960     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2961         assertNotSession();
2962         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2963     }
2964
2965     @Override
2966     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2967         assertNotSession();
2968         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2969     }
2970
2971     @Override
2972     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2973         assertNotSession();
2974         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2975     }
2976
2977     @Override
2978     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2979
2980         assertNotSession();
2981
2982         assert(request != null);
2983
2984         final DataContainer<T> result = new DataContainer<T>();
2985         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2986
2987         syncRequest(request, new AsyncProcedure<T>() {
2988
2989             @Override
2990             public void execute(AsyncReadGraph graph, T t) {
2991                 result.set(t);
2992             }
2993
2994             @Override
2995             public void exception(AsyncReadGraph graph, Throwable t) {
2996                 exception.set(t);
2997             }
2998
2999         });
3000
3001         Throwable t = exception.get();
3002         if(t != null) {
3003             if(t instanceof DatabaseException) throw (DatabaseException)t;
3004             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3005         }
3006
3007         return result.get();
3008
3009     }
3010
3011     @Override
3012     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3013         assertNotSession();
3014         return syncRequest(request, (AsyncProcedure<T>)procedure);
3015     }
3016
3017     @Override
3018     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3019         assertNotSession();
3020         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3021     }
3022
3023     @Override
3024     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3025         assertNotSession();
3026         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3027     }
3028
3029     @Override
3030     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3031         assertNotSession();
3032         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3033     }
3034
3035     @Override
3036     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3037         assertNotSession();
3038         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3039     }
3040
3041     @Override
3042     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3043
3044         assertNotSession();
3045
3046         assert(request != null);
3047
3048         final ArrayList<T> result = new ArrayList<T>();
3049         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3050
3051         syncRequest(request, new AsyncMultiProcedure<T>() {
3052
3053             @Override
3054             public void execute(AsyncReadGraph graph, T t) {
3055                 synchronized(result) {
3056                     result.add(t);
3057                 }
3058             }
3059
3060             @Override
3061             public void finished(AsyncReadGraph graph) {
3062             }
3063
3064             @Override
3065             public void exception(AsyncReadGraph graph, Throwable t) {
3066                 exception.set(t);
3067             }
3068
3069
3070         });
3071
3072         Throwable t = exception.get();
3073         if(t != null) {
3074             if(t instanceof DatabaseException) throw (DatabaseException)t;
3075             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3076         }
3077
3078         return result;
3079
3080     }
3081
3082     @Override
3083     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3084         assertNotSession();
3085         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3086     }
3087
3088     @Override
3089     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3090         assertNotSession();
3091         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3092     }
3093
3094     @Override
3095     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3096         assertNotSession();
3097         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3098     }
3099
3100     @Override
3101     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3102         assertNotSession();
3103         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3104     }
3105
3106     @Override
3107     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3108         assertNotSession();
3109         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3110     }
3111
3112     @Override
3113     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3114
3115         assertNotSession();
3116
3117         assert(request != null);
3118
3119         final ArrayList<T> result = new ArrayList<T>();
3120         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3121
3122         syncRequest(request, new AsyncMultiProcedure<T>() {
3123
3124             @Override
3125             public void execute(AsyncReadGraph graph, T t) {
3126                 synchronized(result) {
3127                     result.add(t);
3128                 }
3129             }
3130
3131             @Override
3132             public void finished(AsyncReadGraph graph) {
3133             }
3134
3135             @Override
3136             public void exception(AsyncReadGraph graph, Throwable t) {
3137                 exception.set(t);
3138             }
3139
3140
3141         });
3142
3143         Throwable t = exception.get();
3144         if(t != null) {
3145             if(t instanceof DatabaseException) throw (DatabaseException)t;
3146             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3147         }
3148
3149         return result;
3150
3151     }
3152
3153     @Override
3154     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3155         assertNotSession();
3156         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3157     }
3158
3159     @Override
3160     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3161         assertNotSession();
3162         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3163     }
3164
3165     @Override
3166     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3167         assertNotSession();
3168        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3169     }
3170
3171     @Override
3172     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3173         assertNotSession();
3174         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3175     }
3176
3177     @Override
3178     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3179         assertNotSession();
3180         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3181     }
3182
3183
3184     @Override
3185     public <T> void asyncRequest(Read<T> request) {
3186
3187         asyncRequest(request, new ProcedureAdapter<T>() {
3188             @Override
3189             public void exception(Throwable t) {
3190                 t.printStackTrace();
3191             }
3192         });
3193
3194     }
3195
3196     @Override
3197     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3198         asyncRequest(request, (AsyncProcedure<T>)procedure);
3199     }
3200
3201     @Override
3202     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3203         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3204     }
3205
3206     @Override
3207     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3208         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3209     }
3210
3211     @Override
3212     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3213         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3214     }
3215
3216     @Override
3217     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3218         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3219     }
3220
3221     @Override
3222     final public <T> void asyncRequest(final AsyncRead<T> request) {
3223
3224         assert(request != null);
3225
3226         asyncRequest(request, new ProcedureAdapter<T>() {
3227             @Override
3228             public void exception(Throwable t) {
3229                 t.printStackTrace();
3230             }
3231         });
3232
3233     }
3234
3235     @Override
3236     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3237         scheduleRequest(request, procedure, procedure, null);
3238     }
3239
3240     @Override
3241     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3242         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3243     }
3244
3245     @Override
3246     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3247         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3248     }
3249
3250     @Override
3251     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3252         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3253     }
3254
3255     @Override
3256     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3257         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3258     }
3259
3260     @Override
3261     public <T> void asyncRequest(MultiRead<T> request) {
3262
3263         assert(request != null);
3264
3265         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3266             @Override
3267             public void exception(AsyncReadGraph graph, Throwable t) {
3268                 t.printStackTrace();
3269             }
3270         });
3271
3272     }
3273
3274     @Override
3275     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3276         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3277     }
3278
3279     @Override
3280     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3281         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3282     }
3283
3284     @Override
3285     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3286         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3287     }
3288
3289     @Override
3290     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3291         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3292     }
3293
3294     @Override
3295     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3296         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3297     }
3298
3299     @Override
3300     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3301
3302         assert(request != null);
3303
3304         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3305             @Override
3306             public void exception(AsyncReadGraph graph, Throwable t) {
3307                 t.printStackTrace();
3308             }
3309         });
3310
3311     }
3312
3313     @Override
3314     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3315         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3316     }
3317
3318     @Override
3319     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3320         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3321     }
3322
3323     @Override
3324     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3325         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3326     }
3327
3328     @Override
3329     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3330         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3331     }
3332
3333     @Override
3334     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3335         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3336     }
3337
3338     @Override
3339     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3340         assertNotSession();
3341         throw new Error("Not implemented!");
3342     }
3343
3344     @Override
3345     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3346         throw new Error("Not implemented!");
3347     }
3348
3349     @Override
3350     final public <T> void asyncRequest(final ExternalRead<T> request) {
3351
3352         assert(request != null);
3353
3354         asyncRequest(request, new ProcedureAdapter<T>() {
3355             @Override
3356             public void exception(Throwable t) {
3357                 t.printStackTrace();
3358             }
3359         });
3360
3361     }
3362
3363     @Override
3364     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3365         asyncRequest(request, (Procedure<T>)procedure);
3366     }
3367
3368
3369
3370     void check(Throwable t) throws DatabaseException {
3371         if(t != null) {
3372             if(t instanceof DatabaseException) throw (DatabaseException)t;
3373             else throw new DatabaseException("Unexpected exception", t);
3374         }
3375     }
3376
3377     void check(DataContainer<Throwable> container) throws DatabaseException {
3378         Throwable t = container.get();
3379         if(t != null) {
3380             if(t instanceof DatabaseException) throw (DatabaseException)t;
3381             else throw new DatabaseException("Unexpected exception", t);
3382         }
3383     }
3384
3385
3386
3387
3388
3389
3390     boolean sameProvider(Write request) {
3391         if(writeState.getGraph().provider != null) {
3392             return writeState.getGraph().provider.equals(request.getProvider());
3393         } else {
3394             return request.getProvider() == null;
3395         }
3396     }
3397
3398     boolean plainWrite(WriteGraphImpl graph) {
3399         if(graph == null) return false;
3400         if(graph.writeSupport.writeOnly()) return false;
3401         return true;
3402     }
3403
3404
3405     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3406     private void assertNotSession() throws DatabaseException {
3407         Thread current = Thread.currentThread();
3408         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3409     }
3410
3411     void assertAlive() {
3412         if (!state.isAlive())
3413             throw new RuntimeDatabaseException("Session has been shut down.");
3414     }
3415
3416     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3417         return querySupport.getValueStream(graph, querySupport.getId(resource));
3418     }
3419
3420     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3421         return querySupport.getValue(graph, querySupport.getId(resource));
3422     }
3423
3424     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3425         acquire(semaphore, request, null);
3426     }
3427
3428     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3429         assertAlive();
3430         try {
3431             long tryCount = 0;
3432             // Loop until the semaphore is acquired reporting requests that take a long time.
3433             while (true) {
3434
3435                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3436                     // Acquired OK.
3437                     return;
3438                 } else {
3439                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3440                         ++tryCount;
3441                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3442                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3443                                 * tryCount;
3444                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3445                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3446                     }
3447                 }
3448
3449                 assertAlive();
3450
3451             }
3452         } catch (InterruptedException e) {
3453             e.printStackTrace();
3454             // FIXME: Should perhaps do something else in this case ??
3455         }
3456     }
3457
3458
3459
3460 //    @Override
3461 //    public boolean holdOnToTransactionAfterCancel() {
3462 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3463 //    }
3464 //
3465 //    @Override
3466 //    public boolean holdOnToTransactionAfterCommit() {
3467 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3468 //    }
3469 //
3470 //    @Override
3471 //    public boolean holdOnToTransactionAfterRead() {
3472 //        return transactionPolicy.holdOnToTransactionAfterRead();
3473 //    }
3474 //
3475 //    @Override
3476 //    public void onRelinquish() {
3477 //        transactionPolicy.onRelinquish();
3478 //    }
3479 //
3480 //    @Override
3481 //    public void onRelinquishDone() {
3482 //        transactionPolicy.onRelinquishDone();
3483 //    }
3484 //
3485 //    @Override
3486 //    public void onRelinquishError() {
3487 //        transactionPolicy.onRelinquishError();
3488 //    }
3489
3490
3491     @Override
3492     public Session getSession() {
3493         return this;
3494     }
3495
3496
3497     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3498     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3499
3500     public void ceased(int thread) {
3501
3502         requestManager.ceased(thread);
3503
3504     }
3505
3506     public int getAmountOfQueryThreads() {
3507         // This must be a power of two
3508         return 1;
3509 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3510     }
3511
3512     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3513
3514         return new ResourceImpl(resourceSupport, key);
3515
3516     }
3517
3518     public void acquireWriteOnly() {
3519         writeOnly = true;
3520     }
3521
3522     public void releaseWriteOnly(ReadGraphImpl graph) {
3523         writeOnly = false;
3524         queryProvider2.releaseWrite(graph);
3525     }
3526
3527     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3528
3529         long start = System.nanoTime();
3530
3531         while(dirtyPrimitives) {
3532             dirtyPrimitives = false;
3533             getQueryProvider2().performDirtyUpdates(writer);
3534             getQueryProvider2().performScheduledUpdates(writer);
3535         }
3536
3537         fireMetadataListeners(writer, clientChanges);
3538
3539         if(DebugPolicy.PERFORMANCE_DATA) {
3540             long end = System.nanoTime();
3541             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3542         }
3543
3544     }
3545
3546     void removeTemporaryData() {
3547         File platform = Platform.getLocation().toFile();
3548         File tempFiles = new File(platform, "tempFiles");
3549         File temp = new File(tempFiles, "db");
3550         if(!temp.exists()) 
3551             return;
3552         try {
3553             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3554                 @Override
3555                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3556                     try {
3557                         Files.delete(file);
3558                     } catch (IOException e) {
3559                         Logger.defaultLogError(e);
3560                     }
3561                     return FileVisitResult.CONTINUE;
3562                 }
3563             });
3564         } catch (IOException e) {
3565             Logger.defaultLogError(e);
3566         }
3567     }
3568
3569     public void handleCreatedClusters() {
3570         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3571             createdClusters.forEach(new TLongProcedure() {
3572
3573                 @Override
3574                 public boolean execute(long value) {
3575                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3576                     cluster.setImmutable(true, clusterTranslator);
3577                     return true;
3578                 }
3579             });
3580         }
3581         createdClusters.clear();
3582     }
3583
3584     @Override
3585     public Object getModificationCounter() {
3586         return queryProvider2.modificationCounter;
3587     }
3588     
3589     @Override
3590     public void markUndoPoint() {
3591         state.setCombine(false);
3592     }
3593
3594 }