]> gerrit.simantics Code Review - simantics/platform.git/blob - bundles/org.simantics.db.procore/src/fi/vtt/simantics/procore/internal/SessionImplSocket.java
Some enhancements made by Antti for multiple readers
[simantics/platform.git] / bundles / org.simantics.db.procore / src / fi / vtt / simantics / procore / internal / SessionImplSocket.java
1 /*******************************************************************************
2  * Copyright (c) 2007, 2010 Association for Decentralized Information Management
3  * in Industry THTH ry.
4  * All rights reserved. This program and the accompanying materials
5  * are made available under the terms of the Eclipse Public License v1.0
6  * which accompanies this distribution, and is available at
7  * http://www.eclipse.org/legal/epl-v10.html
8  *
9  * Contributors:
10  *     VTT Technical Research Centre of Finland - initial API and implementation
11  *******************************************************************************/
12 package fi.vtt.simantics.procore.internal;
13
14 import java.io.File;
15 import java.io.IOException;
16 import java.io.InputStream;
17 import java.nio.charset.Charset;
18 import java.nio.file.FileVisitResult;
19 import java.nio.file.Files;
20 import java.nio.file.Path;
21 import java.nio.file.SimpleFileVisitor;
22 import java.nio.file.attribute.BasicFileAttributes;
23 import java.util.ArrayList;
24 import java.util.Collection;
25 import java.util.HashSet;
26 import java.util.TreeMap;
27 import java.util.concurrent.CopyOnWriteArrayList;
28 import java.util.concurrent.Semaphore;
29 import java.util.concurrent.atomic.AtomicInteger;
30 import java.util.function.Consumer;
31
32 import org.eclipse.core.runtime.Platform;
33 import org.simantics.databoard.Bindings;
34 import org.simantics.db.AsyncReadGraph;
35 import org.simantics.db.ChangeSet;
36 import org.simantics.db.DevelopmentKeys;
37 import org.simantics.db.Disposable;
38 import org.simantics.db.ExternalValueSupport;
39 import org.simantics.db.Metadata;
40 import org.simantics.db.MonitorContext;
41 import org.simantics.db.MonitorHandler;
42 import org.simantics.db.Resource;
43 import org.simantics.db.ResourceSerializer;
44 import org.simantics.db.Session;
45 import org.simantics.db.SessionManager;
46 import org.simantics.db.SessionVariables;
47 import org.simantics.db.VirtualGraph;
48 import org.simantics.db.WriteGraph;
49 import org.simantics.db.authentication.UserAuthenticationAgent;
50 import org.simantics.db.authentication.UserAuthenticator;
51 import org.simantics.db.common.Indexing;
52 import org.simantics.db.common.TransactionPolicyRelease;
53 import org.simantics.db.common.procedure.adapter.AsyncMultiProcedureAdapter;
54 import org.simantics.db.common.procedure.adapter.ProcedureAdapter;
55 import org.simantics.db.common.procedure.wrapper.NoneToAsyncListener;
56 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiListener;
57 import org.simantics.db.common.procedure.wrapper.NoneToAsyncMultiProcedure;
58 import org.simantics.db.common.procedure.wrapper.NoneToAsyncProcedure;
59 import org.simantics.db.common.procedure.wrapper.SyncToAsyncListener;
60 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiListener;
61 import org.simantics.db.common.procedure.wrapper.SyncToAsyncMultiProcedure;
62 import org.simantics.db.common.procedure.wrapper.SyncToAsyncProcedure;
63 import org.simantics.db.common.service.ServiceActivityMonitorImpl;
64 import org.simantics.db.common.service.ServiceActivityUpdaterForWriteTransactions;
65 import org.simantics.db.common.utils.Logger;
66 import org.simantics.db.event.ChangeEvent;
67 import org.simantics.db.event.ChangeListener;
68 import org.simantics.db.event.SessionEventListener;
69 import org.simantics.db.exception.CancelTransactionException;
70 import org.simantics.db.exception.ClusterSetExistException;
71 import org.simantics.db.exception.DatabaseException;
72 import org.simantics.db.exception.ImmutableException;
73 import org.simantics.db.exception.InvalidResourceReferenceException;
74 import org.simantics.db.exception.ResourceNotFoundException;
75 import org.simantics.db.exception.RuntimeDatabaseException;
76 import org.simantics.db.exception.ServiceException;
77 import org.simantics.db.exception.ServiceNotFoundException;
78 import org.simantics.db.impl.ClusterBase;
79 import org.simantics.db.impl.ClusterI;
80 import org.simantics.db.impl.ClusterTraitsBase;
81 import org.simantics.db.impl.ClusterTranslator;
82 import org.simantics.db.impl.ResourceImpl;
83 import org.simantics.db.impl.TransientGraph;
84 import org.simantics.db.impl.VirtualGraphImpl;
85 import org.simantics.db.impl.graph.DelayedWriteGraph;
86 import org.simantics.db.impl.graph.ReadGraphImpl;
87 import org.simantics.db.impl.graph.WriteGraphImpl;
88 import org.simantics.db.impl.graph.WriteSupport;
89 import org.simantics.db.impl.internal.RandomAccessValueSupport;
90 import org.simantics.db.impl.procedure.ResultCallWrappedQueryProcedure4;
91 import org.simantics.db.impl.procedure.ResultCallWrappedSingleQueryProcedure4;
92 import org.simantics.db.impl.query.QueryCache;
93 import org.simantics.db.impl.query.QueryCacheBase;
94 import org.simantics.db.impl.query.QueryProcessor;
95 import org.simantics.db.impl.query.QueryProcessor.SessionRead;
96 import org.simantics.db.impl.query.QueryProcessor.SessionTask;
97 import org.simantics.db.impl.service.QueryDebug;
98 import org.simantics.db.impl.support.VirtualGraphServerSupport;
99 import org.simantics.db.impl.support.WriteRequestScheduleSupport;
100 import org.simantics.db.procedure.AsyncListener;
101 import org.simantics.db.procedure.AsyncMultiListener;
102 import org.simantics.db.procedure.AsyncMultiProcedure;
103 import org.simantics.db.procedure.AsyncProcedure;
104 import org.simantics.db.procedure.Listener;
105 import org.simantics.db.procedure.ListenerBase;
106 import org.simantics.db.procedure.MultiListener;
107 import org.simantics.db.procedure.MultiProcedure;
108 import org.simantics.db.procedure.Procedure;
109 import org.simantics.db.procedure.SyncListener;
110 import org.simantics.db.procedure.SyncMultiListener;
111 import org.simantics.db.procedure.SyncMultiProcedure;
112 import org.simantics.db.procedure.SyncProcedure;
113 import org.simantics.db.procore.cluster.ClusterImpl;
114 import org.simantics.db.procore.cluster.ClusterTraits;
115 import org.simantics.db.procore.protocol.Constants;
116 import org.simantics.db.procore.protocol.DebugPolicy;
117 import org.simantics.db.request.AsyncMultiRead;
118 import org.simantics.db.request.AsyncRead;
119 import org.simantics.db.request.DelayedWrite;
120 import org.simantics.db.request.DelayedWriteResult;
121 import org.simantics.db.request.ExternalRead;
122 import org.simantics.db.request.MultiRead;
123 import org.simantics.db.request.Read;
124 import org.simantics.db.request.ReadInterface;
125 import org.simantics.db.request.Write;
126 import org.simantics.db.request.WriteInterface;
127 import org.simantics.db.request.WriteOnly;
128 import org.simantics.db.request.WriteOnlyResult;
129 import org.simantics.db.request.WriteResult;
130 import org.simantics.db.request.WriteTraits;
131 import org.simantics.db.service.ByteReader;
132 import org.simantics.db.service.ClusterBuilder;
133 import org.simantics.db.service.ClusterBuilderFactory;
134 import org.simantics.db.service.ClusterControl;
135 import org.simantics.db.service.ClusterSetsSupport;
136 import org.simantics.db.service.ClusterUID;
137 import org.simantics.db.service.ClusteringSupport;
138 import org.simantics.db.service.CollectionSupport;
139 import org.simantics.db.service.DebugSupport;
140 import org.simantics.db.service.DirectQuerySupport;
141 import org.simantics.db.service.GraphChangeListenerSupport;
142 import org.simantics.db.service.InitSupport;
143 import org.simantics.db.service.LifecycleSupport;
144 import org.simantics.db.service.ManagementSupport;
145 import org.simantics.db.service.QueryControl;
146 import org.simantics.db.service.SerialisationSupport;
147 import org.simantics.db.service.ServerInformation;
148 import org.simantics.db.service.ServiceActivityMonitor;
149 import org.simantics.db.service.SessionEventSupport;
150 import org.simantics.db.service.SessionMonitorSupport;
151 import org.simantics.db.service.SessionUserSupport;
152 import org.simantics.db.service.StatementSupport;
153 import org.simantics.db.service.TransactionPolicySupport;
154 import org.simantics.db.service.TransactionSupport;
155 import org.simantics.db.service.TransferableGraphSupport;
156 import org.simantics.db.service.UndoRedoSupport;
157 import org.simantics.db.service.VirtualGraphSupport;
158 import org.simantics.db.service.XSupport;
159 import org.simantics.layer0.Layer0;
160 import org.simantics.utils.DataContainer;
161 import org.simantics.utils.Development;
162 import org.simantics.utils.threads.logger.ITask;
163 import org.simantics.utils.threads.logger.ThreadLogger;
164
165 import gnu.trove.procedure.TLongProcedure;
166 import gnu.trove.set.hash.TLongHashSet;
167
168
169 public abstract class SessionImplSocket implements Session, WriteRequestScheduleSupport {
170
171     protected static final boolean DEBUG        = false;
172
173     private static final boolean DIAGNOSTICS       = false;
174
175     private TransactionPolicySupport transactionPolicy;
176
177     final private ClusterControl clusterControl;
178
179     final protected BuiltinSupportImpl builtinSupport;
180     final protected VirtualGraphServerSupportImpl virtualGraphServerSupport;
181     final protected ClusterSetsSupport clusterSetsSupport;
182     final protected LifecycleSupportImpl lifecycleSupport;
183
184     protected QuerySupportImpl querySupport;
185     protected ResourceSupportImpl resourceSupport;
186     protected WriteSupport writeSupport;
187     public ClusterTranslator clusterTranslator;
188
189     boolean dirtyPrimitives = false;
190
191     public static final int SERVICE_MODE_CREATE = 2;
192     public static final int SERVICE_MODE_ALLOW = 1;
193     public int serviceMode = 0;
194     public boolean createdImmutableClusters = false;
195     public TLongHashSet createdClusters = new TLongHashSet();
196
197     private Layer0 L0;
198
199     /**
200      * The service locator maintained by the workbench. These services are
201      * initialized during workbench during the <code>init</code> method.
202      */
203     final protected ServiceLocatorImpl                           serviceLocator     = new ServiceLocatorImpl();
204
205     final public ResourceSerializerImpl                       resourceSerializer = new ResourceSerializerImpl();
206
207     final CopyOnWriteArrayList<ChangeListener>         changeListeners2   = new CopyOnWriteArrayList<ChangeListener>();
208
209     final CopyOnWriteArrayList<ChangeListener>         metadataListeners   = new CopyOnWriteArrayList<ChangeListener>();
210
211     final CopyOnWriteArrayList<SessionEventListener>   eventListeners     = new CopyOnWriteArrayList<SessionEventListener>();
212
213     final HashSet<Thread>                              sessionThreads     = new HashSet<Thread>();
214
215     final BijectionMap<MonitorContext, MonitorHandler> monitorContexts    = new BijectionMap<MonitorContext, MonitorHandler>();
216
217     final protected State                                        state              = new State();
218
219     protected GraphSession                                       graphSession       = null;
220
221     protected SessionManager                                     sessionManagerImpl = null;
222
223     protected UserAuthenticationAgent                            authAgent          = null;
224
225     protected UserAuthenticator                                  authenticator      = null;
226
227     protected Resource                                           user               = null;
228
229     protected ClusterStream                                      clusterStream      = null;
230
231     protected SessionRequestManager                         requestManager = null;
232
233     public ClusterTable                                       clusterTable       = null;
234
235     public QueryProcessor                                     queryProvider2 = null;
236
237     ClientChangesImpl                                  clientChanges      = null;
238
239     MonitorHandler[]                                   monitorHandlers    = new MonitorHandler[0];
240
241 //    protected long                                               newClusterId       = Constants.NullClusterId;
242
243     protected int                                                flushCounter       = 0;
244
245     protected boolean                                            writeOnly          = false;
246
247     WriteState<?>                                                writeState = null;
248     WriteStateBase<?>                                            delayedWriteState = null;
249     protected Resource defaultClusterSet = null; // If not null then used for newResource().
250
251     public SessionImplSocket(SessionManager sessionManagerImpl, UserAuthenticationAgent authAgent) {
252
253 //        if (authAgent == null)
254 //            throw new IllegalArgumentException("null authentication agent");
255
256         File t = StaticSessionProperties.virtualGraphStoragePath;
257         if (null == t)
258             t = new File(".");
259         this.clusterTable = new ClusterTable(this, t);
260         this.builtinSupport = new BuiltinSupportImpl(this);
261         this.sessionManagerImpl = sessionManagerImpl;
262         this.user = null;
263 //        this.authAgent = authAgent;
264
265         serviceLocator.registerService(Session.class, this);
266         serviceLocator.registerService(InitSupport.class, new InitSupportImpl(this));
267         serviceLocator.registerService(ManagementSupport.class, new ManagementSupportImpl(this));
268         serviceLocator.registerService(QueryControl.class, new QueryControlImpl(this));
269         serviceLocator.registerService(SessionUserSupport.class, new SessionUserSupportImpl(this));
270         serviceLocator.registerService(GraphChangeListenerSupport.class, new GraphChangeListenerSupportImpl(this));
271         serviceLocator.registerService(SessionEventSupport.class, new SessionEventSupportImpl(this));
272         serviceLocator.registerService(SerialisationSupport.class, new SerialisationSupportImpl(this));
273         serviceLocator.registerService(UndoRedoSupport.class, new UndoRedoSupportImpl(this));
274         serviceLocator.registerService(ClusteringSupport.class, new ClusteringSupportImpl(this));
275         serviceLocator.registerService(TransactionSupport.class, new TransactionSupportImpl(this));
276         serviceLocator.registerService(SessionMonitorSupport.class, new SessionMonitorSupportImpl(this));
277         serviceLocator.registerService(TransferableGraphSupport.class, new TransferableGraphSupportImpl(this));
278         serviceLocator.registerService(QueryDebug.class, new QueryDebugImpl(this));
279         serviceLocator.registerService(CollectionSupport.class, new CollectionSupportImpl(this));
280         serviceLocator.registerService(StatementSupport.class, new StatementSupportImpl(this));
281         serviceLocator.registerService(DirectQuerySupport.class, new DirectQuerySupportImpl(this));
282         serviceLocator.registerService(XSupport.class, new XSupportImpl(this));
283         serviceLocator.registerService(DebugSupport.class, new DebugSupportImpl());
284         serviceLocator.registerService(ExternalValueSupport.class, new ExternalValueSupportImpl(this));
285         serviceLocator.registerService(RandomAccessValueSupport.class, new RandomAccessValueSupportImpl());
286         serviceLocator.registerService(ServiceActivityMonitor.class, new ServiceActivityMonitorImpl());
287         ServiceActivityUpdaterForWriteTransactions.register(this);
288
289         this.virtualGraphServerSupport = new VirtualGraphServerSupportImpl(this, t);
290         serviceLocator.registerService(VirtualGraphSupport.class, virtualGraphServerSupport);
291         serviceLocator.registerService(VirtualGraphServerSupport.class, virtualGraphServerSupport);
292         this.lifecycleSupport = new LifecycleSupportImpl(this);
293         serviceLocator.registerService(LifecycleSupport.class, lifecycleSupport);
294         this.transactionPolicy = new TransactionPolicyRelease();
295         serviceLocator.registerService(TransactionPolicySupport.class, transactionPolicy);
296         this.clusterControl = new ClusterControlImpl(this);
297         serviceLocator.registerService(ClusterControl.class, clusterControl);
298         this.clusterSetsSupport = new ClusterSetsSupportImpl2(); // Using same path as virtual graphs.
299         this.clusterSetsSupport.setReadDirectory(t.toPath());
300         this.clusterSetsSupport.updateWriteDirectory(t.toPath());
301         serviceLocator.registerService(ClusterSetsSupport.class, clusterSetsSupport);
302     }
303
304     /*
305      *
306      * Session interface
307      */
308
309
310     @Override
311     public Resource getRootLibrary() {
312         return queryProvider2.getRootLibraryResource();
313     }
314
315     void refresh(int thread, ClusterUID[] clusterUID, long csid) throws DatabaseException {
316         if (!graphSession.dbSession.refreshEnabled())
317             return;
318         try {
319             getClusterTable().refresh(csid, this, clusterUID);
320         } catch (Throwable t) {
321             Logger.defaultLogError("Refesh failed.", t);
322         }
323     }
324
325     static final class TaskHelper {
326         private final String name;
327         private Object result;
328         final Semaphore sema = new Semaphore(0);
329         private Throwable throwable = null;
330         final Consumer<DatabaseException> callback = e -> {
331             synchronized (TaskHelper.this) {
332                 throwable = e;
333             }
334         };
335         final Procedure<Object> proc = new Procedure<Object>() {
336             @Override
337             public void execute(Object result) {
338                 callback.accept(null);
339             }
340             @Override
341             public void exception(Throwable t) {
342                 if (t instanceof DatabaseException)
343                     callback.accept((DatabaseException)t);
344                 else
345                     callback.accept(new DatabaseException("" + name + "operation failed.", t));
346             }
347         };
348         final WriteTraits writeTraits = new WriteTraits() {};
349         TaskHelper(String name) {
350             this.name = name;
351         }
352         @SuppressWarnings("unchecked")
353         <T> T getResult() {
354             return (T)result;
355         }
356         void setResult(Object result) {
357             this.result = result;
358         }
359 //        Throwable throwableGet() {
360 //            return throwable;
361 //        }
362         synchronized void throwableSet(Throwable t) {
363             throwable = t;
364         }
365 //        void throwableSet(String t) {
366 //            throwable = new InternalException("" + name + " operation failed. " + t);
367 //        }
368         synchronized void throwableCheck()
369         throws DatabaseException {
370             if (null != throwable)
371                 if (throwable instanceof DatabaseException)
372                     throw (DatabaseException)throwable;
373                 else
374                     throw new DatabaseException("Undo operation failed.", throwable);
375         }
376         void throw_(String message)
377         throws DatabaseException {
378             throw new DatabaseException("" + name + " operation failed. " + message);
379         }
380     }
381
382
383
384     private ListenerBase getListenerBase(Object procedure) {
385         if (procedure instanceof ListenerBase)
386             return (ListenerBase) procedure;
387         else
388             return null;
389     }
390
391     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify) {
392         scheduleRequest(request, callback, notify, null);
393     }
394
395     /* (non-Javadoc)
396      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.Write, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
397      */
398     @Override
399     public <T> void scheduleRequest(final Write request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
400
401         assert (request != null);
402
403         if(Development.DEVELOPMENT) {
404             try {
405             if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
406                 System.err.println("schedule write '" + request + "'");
407             } catch (Throwable t) {
408                 Logger.defaultLogError(t);
409             }
410         }
411
412         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
413
414         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
415
416             @Override
417             public void run(int thread) {
418
419                 if(Development.DEVELOPMENT) {
420                     try {
421                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
422                         System.err.println("perform write '" + request + "'");
423                     } catch (Throwable t) {
424                         Logger.defaultLogError(t);
425                     }
426                 }
427
428                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
429
430                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
431
432                 try {
433
434                     flushCounter = 0;
435                     Disposable.safeDispose(clientChanges);
436                     clientChanges = new ClientChangesImpl(SessionImplSocket.this);
437
438                     VirtualGraph vg = getProvider(request.getProvider());
439
440                     WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
441                     writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
442
443                         @Override
444                         public void execute(Object result) {
445                             if(callback != null) callback.accept(null);
446                         }
447
448                         @Override
449                         public void exception(Throwable t) {
450                             if(callback != null) callback.accept((DatabaseException)t);
451                         }
452
453                     });
454
455                     assert (null != writer);
456 //                    writer.state.barrier.inc();
457
458                     try {
459                         request.perform(writer);
460                         assert (null != writer);
461                     } catch (Throwable t) {
462                         if (!(t instanceof CancelTransactionException))
463                             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", t);
464                         writeState.except(t);
465                     } finally {
466 //                        writer.state.barrier.dec();
467 //                        writer.waitAsync(request);
468                     }
469
470
471                     assert(!queryProvider2.cache.dirty);
472
473                 } catch (Throwable e) {
474
475                     // Log it first, just to be safe that the error is always logged.
476                     if (!(e instanceof CancelTransactionException))
477                     Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
478
479 //                    writeState.getGraph().state.barrier.dec();
480 //                    writeState.getGraph().waitAsync(request);
481
482                     writeState.except(e);
483
484 //                    try {
485 //                        // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
486 //                        // All we can do here is to log those, can't really pass them anywhere.
487 //                        if (callback != null) {
488 //                            if(e instanceof DatabaseException) callback.run((DatabaseException)e);
489 //                            else callback.run(new DatabaseException(e));
490 //                        }
491 //                    } catch (Throwable e2) {
492 //                        Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
493 //                    }
494 //                    boolean empty = clusterStream.reallyFlush();
495 //                    int callerThread = -1;
496 //                    ClientChangesImpl cs = new ClientChangesImpl(SessionImplSocket.this);
497 //                    SynchronizeContext context = new SynchronizeContext(callerThread, SessionImplSocket.this, cs, 1);
498 //                    try {
499 //                        // Send all local changes to server so it can calculate correct reverse change set.
500 //                        // This will call clusterStream.accept().
501 //                        state.cancelCommit(context, clusterStream);
502 //                        if (!empty) {
503 //                            if (!context.isOk()) // this is a blocking operation
504 //                                throw new InternalException("Cancel failed. This should never happen. Contact application support.");
505 //                            getQueryProvider2().performDirtyUpdates(writeState.getGraph());
506 //                        }
507 //                        state.cancelCommit2(context, clusterStream);
508 //                    } catch (DatabaseException e3) {
509 //                        Logger.defaultLogError("Write request cancel caused an unexpected error.", e3);
510 //                    } finally {
511 //                        clusterStream.setOff(false);
512 //                        clientChanges = new ClientChangesImpl(SessionImplSocket.this);
513 //                    }
514
515                 } finally {
516
517                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
518
519                 }
520
521                 task.finish();
522
523                 if(Development.DEVELOPMENT) {
524                     try {
525                     if(Development.<Boolean>getProperty(DevelopmentKeys.SESSION_LOG_WRITES, Bindings.BOOLEAN))
526                         System.err.println("finish write '" + request + "'");
527                     } catch (Throwable t) {
528                         Logger.defaultLogError(t);
529                     }
530                 }
531
532             }
533
534         }, combine);
535
536     }
537
538     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
539         scheduleRequest(request, procedure, notify, null);
540     }
541
542     /* (non-Javadoc)
543      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.WriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
544      */
545     @Override
546     public <T> void scheduleRequest(final WriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
547
548         assert (request != null);
549
550         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
551
552         requestManager.scheduleWrite(new SessionTask(request, thread) {
553
554             @Override
555             public void run(int thread) {
556
557                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
558
559                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
560
561                 flushCounter = 0;
562                 Disposable.safeDispose(clientChanges);
563                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
564
565                 VirtualGraph vg = getProvider(request.getProvider());
566                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
567
568                 try {
569                     WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, procedure);
570                     writeState = writeStateT;
571
572                     assert (null != writer);
573 //                    writer.state.barrier.inc();
574                     writeStateT.setResult(request.perform(writer));
575                     assert (null != writer);
576
577 //                    writer.state.barrier.dec();
578 //                    writer.waitAsync(null);
579
580                 } catch (Throwable e) {
581
582 //                    writer.state.barrier.dec();
583 //                    writer.waitAsync(null);
584
585                     writeState.except(e);
586
587 //                  state.stopWriteTransaction(clusterStream);
588 //
589 //              } catch (Throwable e) {
590 //                  // Log it first, just to be safe that the error is always logged.
591 //                  Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
592 //
593 //                  try {
594 //                      // Callback is client code, we have to be prepared for it to throw unexpected exceptions.
595 //                      // All we can do here is to log those, can't really pass them anywhere.
596 //                      if (procedure != null) {
597 //                          if(e instanceof DatabaseException) procedure.exception((DatabaseException)e);
598 //                          else procedure.exception(new DatabaseException(e));
599 //                      }
600 //                  } catch (Throwable e2) {
601 //                      Logger.defaultLogError("Write request callback caused an unexpected error, see exception.", e2);
602 //                  }
603 //
604 //                  clientChanges = new ClientChangesImpl(SessionImplSocket.this);
605 //
606 //                  state.stopWriteTransaction(clusterStream);
607
608                 } finally {
609                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
610                 }
611
612 //              if(notify != null) notify.release();
613
614                 task.finish();
615
616             }
617
618         }, combine);
619
620     }
621
622     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify) {
623         scheduleRequest(request, callback, notify, null);
624     }
625
626     /* (non-Javadoc)
627      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWrite, org.simantics.utils.datastructures.Callback, java.util.concurrent.Semaphore, java.lang.Boolean)
628      */
629     @Override
630     public <T> void scheduleRequest(final DelayedWrite request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
631
632         final ITask total = ThreadLogger.getInstance().begin("ScheduleDelayedWrite");
633
634         assert (request != null);
635
636         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
637
638         requestManager.scheduleWrite(new SessionTask(request, thread) {
639
640             @Override
641             public void run(int thread) {
642                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
643
644                 Procedure<Object> stateProcedure = new Procedure<Object>() {
645                     @Override
646                     public void execute(Object result) {
647                         if (callback != null)
648                             callback.accept(null);
649                     }
650                     @Override
651                     public void exception(Throwable t) {
652                         if (callback != null) {
653                             if (t instanceof DatabaseException) callback.accept((DatabaseException) t);
654                             else callback.accept(new DatabaseException(t));
655                         } else
656                             Logger.defaultLogError("Unhandled exception", t);
657                     }
658                 };
659
660                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
661                 delayedWriteState = new WriteStateBase<Object>(request, notify, stateProcedure);
662                 DelayedWriteGraph dwg = null;
663 //                newGraph.state.barrier.inc();
664
665                 try {
666                     dwg = new DelayedWriteGraph(newGraph);
667                     request.perform(dwg);
668                 } catch (Throwable e) {
669                     delayedWriteState.except(e);
670                     total.finish();
671                     dwg.close();
672                     return;
673                 } finally {
674 //                    newGraph.state.barrier.dec();
675 //                    newGraph.waitAsync(request);
676                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
677                 }
678
679                 delayedWriteState = null;
680
681                 ITask task2 = ThreadLogger.getInstance().begin("DelayedWriteCommit");
682                 fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
683
684                 flushCounter = 0;
685                 Disposable.safeDispose(clientChanges);
686                 clientChanges = new ClientChangesImpl(SessionImplSocket.this);
687
688                 acquireWriteOnly();
689
690                 VirtualGraph vg = getProvider(request.getProvider());
691                 WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
692
693                 WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
694
695                 writeState = new WriteState<Object>(writer, request, notify, stateProcedure);
696
697                 assert (null != writer);
698 //                writer.state.barrier.inc();
699
700                 try {
701                     // Cannot cancel
702                     dwg.commit(writer, request);
703
704                         if(defaultClusterSet != null) {
705                                 XSupport xs = getService(XSupport.class);
706                                 defaultClusterSet = xs.convertDelayedResourceToResource(defaultClusterSet);
707                                 ClusteringSupport cs = getService(ClusteringSupport.class);
708                                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), cs.getCluster(defaultClusterSet));
709                         }
710
711                     // This makes clusters available from server
712                     clusterStream.reallyFlush();
713
714                     releaseWriteOnly(writer);
715                     handleUpdatesAndMetadata(writer);
716
717                 } catch (ServiceException e) {
718 //                    writer.state.barrier.dec();
719 //                    writer.waitAsync(null);
720
721                     // These shall be requested from server
722                     clusterTable.removeWriteOnlyClusters();
723                     // This makes clusters available from server
724                     clusterStream.reallyFlush();
725
726                     releaseWriteOnly(writer);
727                     writeState.except(e);
728                 } finally {
729                     // Debugging & Profiling
730                     fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
731                     task2.finish();
732                     total.finish();
733                 }
734             }
735
736         }, combine);
737
738     }
739
740     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify) {
741         scheduleRequest(request, procedure, notify, null);
742     }
743
744     /* (non-Javadoc)
745      * @see fi.vtt.simantics.procore.internal.WriteRequestScheduler#scheduleRequest(org.simantics.db.request.DelayedWriteResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
746      */
747     @Override
748     public <T> void scheduleRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure, final Semaphore notify, Boolean combine) {
749         throw new Error("Not implemented");
750     }
751
752     protected ClusterImpl getNewResourceCluster() throws DatabaseException {
753         ClusterImpl cluster = clusterTable.getNewResourceCluster(clusterTranslator, graphSession, writeOnly);
754         if((serviceMode & SERVICE_MODE_CREATE) > 0) {
755             createdClusters.add(cluster.clusterId);
756         }
757         return cluster;
758     }
759
760     class WriteOnlySupport implements WriteSupport {
761
762         ClusterStream stream;
763         ClusterImpl currentCluster;
764
765         public WriteOnlySupport() {
766             this.stream = clusterStream;
767         }
768
769         @Override
770         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) throws ServiceException {
771             claim(provider, ((ResourceImpl)subject).id, ((ResourceImpl)predicate).id, ((ResourceImpl)object).id );
772         }
773
774         @Override
775         public void claim(VirtualGraph provider, int s, int p, int o) throws ServiceException {
776
777             ClusterImpl cluster = clusterTable.getClusterByResourceKey(s);
778
779             if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
780                 if(s != queryProvider2.getRootLibrary())
781                     throw new ImmutableException("Trying to modify immutable resource key=" + s);
782
783             try {
784                 maintainCluster(cluster, cluster.addRelation(s, p, o, clusterTranslator));
785             } catch (DatabaseException e) {
786                 Logger.defaultLogError(e);
787                 //throw e;
788             }
789
790             clientChanges.invalidate(s);
791
792             if (cluster.isWriteOnly())
793                 return;
794             queryProvider2.updateStatements(s, p);
795
796         }
797
798         @Override
799         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
800             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
801         }
802
803         @Override
804         public void claimValue(VirtualGraph provider, int rid, byte[] value, int length) {
805
806             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
807             try {
808                 maintainCluster(cluster, cluster.setValue(rid, value, length, clusterTranslator));
809             } catch (DatabaseException e) {
810                 Logger.defaultLogError(e);
811             }
812
813             clientChanges.invalidate(rid);
814
815             if (cluster.isWriteOnly())
816                 return;
817             queryProvider2.updateValue(rid);
818
819         }
820
821         @Override
822         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
823
824             if(amount < 65536) {
825                 claimValue(provider,resource, reader.readBytes(null, amount));
826                 return;
827             }
828
829             byte[] bytes = new byte[65536];
830
831             int rid = ((ResourceImpl)resource).id;
832             ClusterImpl cluster = clusterTable.getClusterByResourceKey(rid);
833             try {
834                 int left = amount;
835                 while(left > 0) {
836                     int block = Math.min(left, 65536);
837                     reader.readBytes(bytes, block);
838                     maintainCluster(cluster, cluster.modiValueEx(rid, amount-left, block, bytes, 0, clusterTranslator));
839                     left -= block;
840                 }
841             } catch (DatabaseException e) {
842                 Logger.defaultLogError(e);
843             }
844
845             clientChanges.invalidate(rid);
846
847             if (cluster.isWriteOnly())
848                 return;
849             queryProvider2.updateValue(rid);
850
851         }
852
853         private void maintainCluster(ClusterImpl before, ClusterI after_) {
854             if(after_ != null && after_ != before) {
855                 ClusterImpl after = (ClusterImpl)after_;
856                 if(currentCluster == before) {
857                     currentCluster = after;
858                 }
859                 clusterTable.replaceCluster(after);
860             }
861         }
862
863         public int createResourceKey(int foreignCounter) throws DatabaseException {
864             if(currentCluster == null) {
865                 currentCluster = getNewResourceCluster();
866             }
867             if(currentCluster.getNumberOfResources(clusterTranslator) == ClusterTable.CLUSTER_FILL_SIZE) {
868                 ClusterWriteOnly newCluster = (ClusterWriteOnly)getNewResourceCluster();
869                 newCluster.foreignLookup = new byte[foreignCounter];
870                 currentCluster = newCluster;
871                 if (DEBUG)
872                     System.err.println("foreignLookup " + currentCluster + " " + foreignCounter);
873             }
874             return currentCluster.createResource(clusterTranslator);
875         }
876
877         @Override
878         public Resource createResource(VirtualGraph provider) throws DatabaseException {
879             if(currentCluster == null) {
880                 if (null != defaultClusterSet) {
881                         ResourceImpl result = getNewResource(defaultClusterSet);
882                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
883                     return result;
884                 } else {
885                     currentCluster = getNewResourceCluster();
886                 }
887             }
888             if(currentCluster.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
889                 if (null != defaultClusterSet) {
890                         ResourceImpl result = getNewResource(defaultClusterSet);
891                     currentCluster = clusterTable.getClusterByResourceKey(result.id);
892                     return result;
893                 } else {
894                     currentCluster = getNewResourceCluster();
895                 }
896             }
897             return new ResourceImpl(resourceSupport, currentCluster.createResource(clusterTranslator));
898         }
899
900         @Override
901         public Resource createResource(VirtualGraph provider, long clusterId)
902         throws DatabaseException {
903             return getNewResource(clusterId);
904         }
905
906         @Override
907         public Resource createResource(VirtualGraph provider, Resource clusterSet)
908         throws DatabaseException {
909             return getNewResource(clusterSet);
910         }
911
912         @Override
913         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
914                 throws DatabaseException {
915             getNewClusterSet(clusterSet);
916         }
917
918         @Override
919         public boolean hasClusterSet(VirtualGraph dummy, Resource clusterSet)
920         throws ServiceException {
921             return containsClusterSet(clusterSet);
922         }
923
924         public void selectCluster(long cluster) {
925                 currentCluster = clusterTable.getClusterByClusterId(cluster);
926                 long setResourceId = clusterSetsSupport.getSet(cluster);
927                 clusterSetsSupport.put(setResourceId, cluster);
928         }
929
930         @Override
931         public Resource setDefaultClusterSet(Resource clusterSet)
932         throws ServiceException {
933                 Resource result = setDefaultClusterSet4NewResource(clusterSet);
934                 if(clusterSet != null) {
935                         long id = clusterSetsSupport.get(clusterSet.getResourceId());
936                         currentCluster = clusterTable.getClusterByClusterId(id);
937                         return result;
938                 } else {
939                         currentCluster = null;
940                         return null;
941                 }
942         }
943
944         @Override
945         public void denyValue(VirtualGraph provider, Resource resource) throws ServiceException {
946
947                  provider = getProvider(provider);
948              if (null == provider) {
949                  int key = ((ResourceImpl)resource).id;
950
951
952
953                  ClusterI cluster = clusterTable.getClusterProxyByResourceKey(key);
954 //                 if (cluster.getImmutable() && (serviceMode & SessionImplSocket.SERVICE_MODE_ALLOW) == 0)
955 //                      if(key != queryProvider2.getRootLibrary())
956 //                              throw new ImmutableException("Trying to modify immutable resource key=" + key);
957
958 //                 try {
959 //                     cluster.removeValue(key, clusterTranslator);
960 //                 } catch (DatabaseException e) {
961 //                     Logger.defaultLogError(e);
962 //                     return;
963 //                 }
964
965                  clusterTable.writeOnlyInvalidate(cluster);
966
967                  try {
968                          int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(key);
969                          clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.DELETE_OPERATION);
970                          clusterTranslator.removeValue(cluster);
971                  } catch (DatabaseException e) {
972                          Logger.defaultLogError(e);
973                  }
974
975                  queryProvider2.invalidateResource(key);
976                  clientChanges.invalidate(key);
977
978              } else {
979                  ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
980                  queryProvider2.updateValue(querySupport.getId(resource));
981                  clientChanges.claimValue(resource);
982              }
983
984
985         }
986
987         @Override
988         public void flush(boolean intermediate) {
989             throw new UnsupportedOperationException();
990         }
991
992         @Override
993         public void flushCluster() {
994             clusterTable.flushCluster(graphSession);
995             if(defaultClusterSet != null) {
996                 clusterSetsSupport.put(defaultClusterSet.getResourceId(), Constants.NewClusterId);
997             }
998             currentCluster = null;
999         }
1000
1001         @Override
1002         public void flushCluster(Resource r) {
1003             throw new UnsupportedOperationException("flushCluster resource " + r);
1004         }
1005
1006         @Override
1007         public void gc() {
1008         }
1009
1010         @Override
1011         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate,
1012                 Resource object) {
1013
1014                 int s = ((ResourceImpl)subject).id;
1015                 int p = ((ResourceImpl)predicate).id;
1016                 int o = ((ResourceImpl)object).id;
1017
1018                 provider = getProvider(provider);
1019                 if (null == provider) {
1020
1021                         ClusterI cluster = clusterTable.getClusterProxyByResourceKey(s);
1022                         clusterTable.writeOnlyInvalidate(cluster);
1023
1024                         try {
1025
1026                                 int sResourceKey = ClusterTraits.getResourceIndexFromResourceKey(s);
1027
1028                                 ClusterI pc = clusterTable.getClusterProxyByResourceKey(p);
1029                                 ClusterI oc = clusterTable.getClusterProxyByResourceKey(o);
1030
1031                                 clusterTranslator.addStatementIndex(cluster, sResourceKey, cluster.getClusterUID(), ClusterChange.REMOVE_OPERATION);
1032                                 clusterTranslator.addStatementIndex(cluster, p, pc.getClusterUID(), ClusterStream.NULL_OPERATION);
1033                                 clusterTranslator.addStatementIndex(cluster, o, oc.getClusterUID(), ClusterStream.NULL_OPERATION);
1034                                 clusterTranslator.removeStatement(cluster);
1035
1036                                 queryProvider2.invalidateResource(s);
1037                                 clientChanges.invalidate(s);
1038
1039                         } catch (DatabaseException e) {
1040
1041                                 Logger.defaultLogError(e);
1042
1043                         }
1044
1045                         return true;
1046
1047                 } else {
1048                         
1049                         ((VirtualGraphImpl)provider).deny(s, p, o);
1050                         queryProvider2.invalidateResource(s);
1051                         clientChanges.invalidate(s);
1052
1053                         return true;
1054
1055                 }
1056
1057         }
1058
1059         @Override
1060         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1061             throw new UnsupportedOperationException();
1062         }
1063
1064         @Override
1065         public boolean writeOnly() {
1066             return true;
1067         }
1068
1069         @Override
1070         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1071             writeSupport.performWriteRequest(graph, request);
1072         }
1073
1074         @Override
1075         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1076             throw new UnsupportedOperationException();
1077         }
1078
1079         @Override
1080         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1081             throw new UnsupportedOperationException();
1082         }
1083
1084         @Override
1085         public <T> void addMetadata(Metadata data) throws ServiceException {
1086             writeSupport.addMetadata(data);
1087         }
1088
1089         @Override
1090         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1091             return writeSupport.getMetadata(clazz);
1092         }
1093
1094         @Override
1095         public TreeMap<String, byte[]> getMetadata() {
1096             return writeSupport.getMetadata();
1097         }
1098
1099         @Override
1100         public void commitDone(WriteTraits writeTraits, long csid) {
1101             writeSupport.commitDone(writeTraits, csid);
1102         }
1103
1104         @Override
1105         public void clearUndoList(WriteTraits writeTraits) {
1106             writeSupport.clearUndoList(writeTraits);
1107         }
1108         @Override
1109         public int clearMetadata() {
1110             return writeSupport.clearMetadata();
1111         }
1112
1113                 @Override
1114                 public void startUndo() {
1115                         writeSupport.startUndo();
1116                 }
1117     }
1118
1119     class VirtualWriteOnlySupport implements WriteSupport {
1120
1121 //        @Override
1122 //        public void addStatement(VirtualGraph provider, Resource subject, Resource predicate,
1123 //                Resource object) {
1124 //            throw new UnsupportedOperationException();
1125 //        }
1126
1127         @Override
1128         public void claim(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1129
1130             TransientGraph impl = (TransientGraph)provider;
1131             impl.claim(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1132             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1133             clientChanges.claim(subject, predicate, object);
1134
1135         }
1136
1137         @Override
1138         public void claim(VirtualGraph provider, int subject, int predicate, int object) {
1139
1140             TransientGraph impl = (TransientGraph)provider;
1141             impl.claim(subject, predicate, object);
1142             getQueryProvider2().updateStatements(subject, predicate);
1143             clientChanges.claim(subject, predicate, object);
1144
1145         }
1146
1147         @Override
1148         public void claimValue(VirtualGraph provider, Resource resource, byte[] value) throws DatabaseException {
1149             claimValue(provider, ((ResourceImpl)resource).id, value, value.length);
1150         }
1151
1152         @Override
1153         public void claimValue(VirtualGraph provider, int resource, byte[] value, int length) {
1154             ((VirtualGraphImpl)provider).claimValue(resource, value, length);
1155             getQueryProvider2().updateValue(resource);
1156             clientChanges.claimValue(resource);
1157         }
1158
1159         @Override
1160         public void claimValue(VirtualGraph provider, Resource resource, ByteReader reader, int amount) throws DatabaseException {
1161             byte[] value = reader.readBytes(null, amount);
1162             claimValue(provider, resource, value);
1163         }
1164
1165         @Override
1166         public Resource createResource(VirtualGraph provider) {
1167             TransientGraph impl = (TransientGraph)provider;
1168             return impl.getResource(impl.newResource(false));
1169         }
1170
1171         @Override
1172         public Resource createResource(VirtualGraph provider, long clusterId) {
1173             throw new UnsupportedOperationException();
1174         }
1175
1176         @Override
1177         public Resource createResource(VirtualGraph provider, Resource clusterSet)
1178         throws DatabaseException {
1179             throw new UnsupportedOperationException();
1180         }
1181
1182         @Override
1183         public void createClusterSet(VirtualGraph provider, Resource clusterSet)
1184         throws DatabaseException {
1185             throw new UnsupportedOperationException();
1186         }
1187
1188         @Override
1189         public boolean hasClusterSet(VirtualGraph provider, Resource clusterSet)
1190         throws ServiceException {
1191             throw new UnsupportedOperationException();
1192         }
1193
1194         @Override
1195         public Resource setDefaultClusterSet(Resource clusterSet)
1196         throws ServiceException {
1197                 return null;
1198         }
1199
1200         @Override
1201         public void denyValue(VirtualGraph provider, Resource resource) {
1202             ((VirtualGraphImpl)provider).denyValue(((ResourceImpl) resource).id);
1203             getQueryProvider2().updateValue(querySupport.getId(resource));
1204             // NOTE: this only keeps track of value changes by-resource.
1205             clientChanges.claimValue(resource);
1206         }
1207
1208         @Override
1209         public void flush(boolean intermediate) {
1210             throw new UnsupportedOperationException();
1211         }
1212
1213         @Override
1214         public void flushCluster() {
1215             throw new UnsupportedOperationException();
1216         }
1217
1218         @Override
1219         public void flushCluster(Resource r) {
1220             throw new UnsupportedOperationException("Resource " + r);
1221         }
1222
1223         @Override
1224         public void gc() {
1225         }
1226
1227         @Override
1228         public boolean removeStatement(VirtualGraph provider, Resource subject, Resource predicate, Resource object) {
1229             TransientGraph impl = (TransientGraph) provider;
1230             impl.deny(querySupport.getId(subject), querySupport.getId(predicate), querySupport.getId(object));
1231             getQueryProvider2().updateStatements(querySupport.getId(subject), querySupport.getId(predicate));
1232             clientChanges.deny(subject, predicate, object);
1233             return true;
1234         }
1235
1236         @Override
1237         public void setValue(VirtualGraph provider, Resource resource, byte[] value) {
1238             throw new UnsupportedOperationException();
1239         }
1240
1241         @Override
1242         public boolean writeOnly() {
1243             return true;
1244         }
1245
1246         @Override
1247         public void performWriteRequest(WriteGraph graph, Write request) throws DatabaseException {
1248             throw new UnsupportedOperationException();
1249         }
1250
1251         @Override
1252         public <T> T performWriteRequest(WriteGraph graph, WriteResult<T> request) throws DatabaseException {
1253             throw new UnsupportedOperationException();
1254         }
1255
1256         @Override
1257         public void performWriteRequest(WriteGraph graph, WriteOnly request) throws DatabaseException {
1258             throw new UnsupportedOperationException();
1259         }
1260
1261         @Override
1262         public <T> void addMetadata(Metadata data) throws ServiceException {
1263             throw new UnsupportedOperationException();
1264         }
1265
1266         @Override
1267         public <T extends Metadata> T getMetadata(Class<T> clazz) throws ServiceException {
1268             throw new UnsupportedOperationException();
1269         }
1270
1271         @Override
1272         public TreeMap<String, byte[]> getMetadata() {
1273             throw new UnsupportedOperationException();
1274         }
1275
1276         @Override
1277         public void commitDone(WriteTraits writeTraits, long csid) {
1278         }
1279
1280         @Override
1281         public void clearUndoList(WriteTraits writeTraits) {
1282         }
1283
1284         @Override
1285         public int clearMetadata() {
1286             return 0;
1287         }
1288
1289                 @Override
1290                 public void startUndo() {
1291                 }
1292     }
1293
1294     private <T> void performWriteOnly(WriteOnlyResult<T> request, Semaphore notify, Procedure<T> callback) {
1295
1296         try {
1297
1298             //int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1299
1300             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1301
1302             flushCounter = 0;
1303             Disposable.safeDispose(clientChanges);
1304             clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1305
1306             acquireWriteOnly();
1307
1308             WriteSupport writeSupport = request.getProvider() != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1309
1310             WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, request.getProvider());
1311
1312             WriteState<T> writeStateT = new WriteState<T>(writer, request, notify, callback);
1313             writeState = writeStateT;
1314
1315             assert (null != writer);
1316 //            writer.state.barrier.inc();
1317             long start = System.nanoTime();
1318             T result = request.perform(writer);
1319             long duration = System.nanoTime() - start;
1320             if (DEBUG) {
1321                 System.err.println("################");
1322                 System.err.println("WriteOnly duration " + 1e-9*duration);
1323             }
1324             writeStateT.setResult(result);
1325
1326             // This makes clusters available from server
1327             clusterStream.reallyFlush();
1328
1329             // This will trigger query updates
1330             releaseWriteOnly(writer);
1331             assert (null != writer);
1332
1333             handleUpdatesAndMetadata(writer);
1334
1335         } catch (CancelTransactionException e) {
1336
1337             releaseWriteOnly(writeState.getGraph());
1338
1339             clusterTable.removeWriteOnlyClusters();
1340             state.stopWriteTransaction(clusterStream);
1341
1342         } catch (Throwable e) {
1343
1344             e.printStackTrace();
1345
1346             releaseWriteOnly(writeState.getGraph());
1347
1348             clusterTable.removeWriteOnlyClusters();
1349
1350             if (callback != null)
1351                 callback.exception(new DatabaseException(e));
1352
1353             state.stopWriteTransaction(clusterStream);
1354             Logger.defaultLogError("Write transaction caused an unexpected error, see exception.", e);
1355
1356         } finally  {
1357
1358             fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1359
1360         }
1361
1362     }
1363
1364     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify) {
1365         scheduleRequest(request, callback, notify, null);
1366     }
1367
1368     @Override
1369     public <T> void scheduleRequest(final WriteOnly request, final Consumer<DatabaseException> callback, final Semaphore notify, Boolean combine) {
1370
1371         assertAlive();
1372
1373         assert (request != null);
1374
1375         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1376
1377         requestManager.scheduleWrite(new SessionTask(request, thread, thread) {
1378
1379             @Override
1380             public void run(int thread) {
1381
1382                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1383
1384                     try {
1385
1386                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1387
1388                         flushCounter = 0;
1389                         Disposable.safeDispose(clientChanges);
1390                         clientChanges = new ClientChangesImpl(SessionImplSocket.this);
1391
1392                         acquireWriteOnly();
1393
1394                         VirtualGraph vg = getProvider(request.getProvider());
1395                         WriteSupport writeSupport = vg != null ? new VirtualWriteOnlySupport() : new WriteOnlySupport();
1396
1397                         WriteGraphImpl writer = WriteGraphImpl.create(getQueryProvider2(), writeSupport, vg);
1398
1399                         writeState = new WriteState<Object>(writer, request, notify, new Procedure<Object>() {
1400
1401                             @Override
1402                             public void execute(Object result) {
1403                                 if(callback != null) callback.accept(null);
1404                             }
1405
1406                             @Override
1407                             public void exception(Throwable t) {
1408                                 if(callback != null) callback.accept((DatabaseException)t);
1409                             }
1410
1411                         });
1412
1413                         assert (null != writer);
1414 //                        writer.state.barrier.inc();
1415
1416                         try {
1417
1418                             request.perform(writer);
1419
1420                         } catch (Throwable e) {
1421
1422 //                            writer.state.barrier.dec();
1423 //                            writer.waitAsync(null);
1424
1425                             releaseWriteOnly(writer);
1426
1427                             clusterTable.removeWriteOnlyClusters();
1428
1429                             if(!(e instanceof CancelTransactionException)) {
1430                                 if (callback != null)
1431                                     callback.accept(new DatabaseException(e));
1432                             }
1433
1434                             writeState.except(e);
1435
1436                             return;
1437
1438                         }
1439
1440                         // This makes clusters available from server
1441                         boolean empty = clusterStream.reallyFlush();
1442                         // This was needed to make WO requests call metadata listeners.
1443                         // NOTE: the calling event does not contain clientChanges information.
1444                         if (!empty && clientChanges.isEmpty())
1445                             clientChanges.setNotEmpty(true);
1446                         releaseWriteOnly(writer);
1447                         assert (null != writer);
1448                     } finally  {
1449
1450                         fireSessionVariableChange(SessionVariables.QUEUED_WRITES);
1451
1452                     }
1453
1454
1455                 task.finish();
1456
1457             }
1458
1459         }, combine);
1460
1461     }
1462
1463     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify) {
1464         scheduleRequest(request, callback, notify, null);
1465     }
1466
1467     /* (non-Javadoc)
1468      * @see org.simantics.db.impl.support.WriteRequestScheduleSupport#scheduleRequest(org.simantics.db.request.WriteOnlyResult, org.simantics.db.procedure.Procedure, java.util.concurrent.Semaphore, java.lang.Boolean)
1469      */
1470     @Override
1471     public <T> void scheduleRequest(final WriteOnlyResult<T> request, final Procedure<T> callback, final Semaphore notify, Boolean combine) {
1472
1473         assert (request != null);
1474
1475         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1476
1477         requestManager.scheduleWrite(new SessionTask(request, thread) {
1478
1479             @Override
1480             public void run(int thread) {
1481
1482                 ITask task = ThreadLogger.getInstance().begin("WriteRequest " + request);
1483
1484                 performWriteOnly(request, notify, callback);
1485
1486                 task.finish();
1487
1488             }
1489
1490         }, combine);
1491
1492     }
1493
1494     public <T> void scheduleRequest(final Read<T> request, final AsyncProcedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1495
1496         assert (request != null);
1497         assert (procedure != null);
1498
1499         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1500
1501         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1502
1503             @Override
1504             public void run(int thread) {
1505
1506                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1507
1508                 ListenerBase listener = getListenerBase(procedure);
1509
1510                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1511
1512                 try {
1513
1514                     if (listener != null) {
1515
1516                         try {
1517                                 
1518                                 AsyncProcedure ap = new AsyncProcedure<T>() {
1519
1520                                         @Override
1521                                         public void exception(AsyncReadGraph graph, Throwable t) {
1522                                                 procedure.exception(graph, t);
1523                                                 if(throwable != null) {
1524                                                         throwable.set(t);
1525                                                 } else {
1526                                                         //                                    ErrorLogger.defaultLogError("Unhandled exception", t);
1527                                                 }
1528                                         }
1529
1530                                         @Override
1531                                         public void execute(AsyncReadGraph graph, T t) {
1532                                                 if(result != null) result.set(t);
1533                                                 procedure.execute(graph, t);
1534                                         }
1535
1536                                 };
1537                                 
1538                                 QueryCache.runnerReadEntry(newGraph, request, null, listener, ap);
1539                                 
1540                         } catch (Throwable t) {
1541                             // This is handled by the AsyncProcedure
1542                                 //Logger.defaultLogError("Internal error", t);
1543                         }
1544
1545                     } else {
1546
1547                         try {
1548
1549 //                            newGraph.state.barrier.inc();
1550
1551                             T t = request.perform(newGraph);
1552
1553                             try {
1554
1555                                 if(result != null) result.set(t);
1556                                 procedure.execute(newGraph, t);
1557
1558                             } catch (Throwable th) {
1559
1560                                 if(throwable != null) {
1561                                     throwable.set(th);
1562                                 } else {
1563                                     Logger.defaultLogError("Unhandled exception", th);
1564                                 }
1565
1566                             }
1567
1568                         } catch (Throwable t) {
1569
1570                             if (DEBUG)
1571                                 t.printStackTrace();
1572
1573                             if(throwable != null) {
1574                                 throwable.set(t);
1575                             } else {
1576                                 Logger.defaultLogError("Unhandled exception", t);
1577                             }
1578
1579                             try {
1580
1581                                 procedure.exception(newGraph, t);
1582
1583                             } catch (Throwable t2) {
1584
1585                                 if(throwable != null) {
1586                                     throwable.set(t2);
1587                                 } else {
1588                                     Logger.defaultLogError("Unhandled exception", t2);
1589                                 }
1590
1591                             }
1592
1593                         }
1594
1595 //                        newGraph.state.barrier.dec();
1596 //                        newGraph.waitAsync(request);
1597
1598                     }
1599
1600                 } finally {
1601
1602                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1603
1604                 }
1605
1606             }
1607
1608         });
1609
1610     }
1611
1612     public <T> void scheduleRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure, final ListenerBase listener, final Semaphore notify) {
1613
1614         assert (request != null);
1615         assert (procedure != null);
1616
1617         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1618
1619         requestManager.scheduleRead(new SessionRead(request, null, notify, thread) {
1620
1621             @Override
1622             public void run(int thread) {
1623
1624                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1625
1626                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1627
1628                 try {
1629
1630                     if (listener != null) {
1631
1632                         try {
1633                                 QueryCache.runnerAsyncReadEntry(newGraph, request, null, listener, procedure);
1634                                                         //newGraph.processor.query(newGraph, request, null, procedure, listener);
1635                                                 } catch (DatabaseException e) {
1636                                                         Logger.defaultLogError(e);
1637                                                 }
1638
1639                     } else {
1640
1641                         final ResultCallWrappedSingleQueryProcedure4<T> wrapper = new ResultCallWrappedSingleQueryProcedure4<T>(
1642                                 procedure, "request");
1643
1644                         try {
1645
1646 //                            newGraph.state.barrier.inc();
1647
1648                             request.perform(newGraph, wrapper);
1649
1650 //                            newGraph.waitAsync(request);
1651
1652                         } catch (Throwable t) {
1653
1654                             wrapper.exception(newGraph, t);
1655 //                            newGraph.waitAsync(request);
1656
1657
1658                         }
1659
1660                     }
1661
1662                 } finally {
1663
1664                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1665
1666                 }
1667
1668             }
1669
1670         });
1671
1672     }
1673
1674     public <T> void scheduleRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure, final Semaphore notify) {
1675
1676         assert (request != null);
1677         assert (procedure != null);
1678
1679         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1680
1681         int sync = notify != null ? thread : -1;
1682
1683         requestManager.scheduleRead(new SessionRead(request, null, notify, thread, sync) {
1684
1685             @Override
1686             public void run(int thread) {
1687
1688                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1689
1690                 ListenerBase listener = getListenerBase(procedure);
1691
1692                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1693
1694                 try {
1695
1696                     if (listener != null) {
1697
1698                         newGraph.processor.query(newGraph, request, null, procedure, listener);
1699
1700 //                        newGraph.waitAsync(request);
1701
1702                     } else {
1703
1704                         final ResultCallWrappedQueryProcedure4<T> wrapper = new ResultCallWrappedQueryProcedure4<T>(procedure);
1705
1706                         try {
1707
1708                             request.perform(newGraph, wrapper);
1709
1710                         } catch (Throwable t) {
1711
1712                             t.printStackTrace();
1713
1714                         }
1715
1716                     }
1717
1718                 } finally {
1719
1720                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1721
1722                 }
1723
1724             }
1725
1726         });
1727
1728     }
1729
1730     public <T> void scheduleRequest(final ExternalRead<T> request, final Procedure<T> procedure, final Semaphore notify, final DataContainer<Throwable> throwable, final DataContainer<T> result) {
1731
1732         assert (request != null);
1733         assert (procedure != null);
1734
1735         int thread = request.hashCode() & queryProvider2.THREAD_MASK;
1736
1737         requestManager.scheduleRead(new SessionRead(request, throwable, notify, thread, thread) {
1738
1739             @Override
1740             public void run(int thread) {
1741
1742                 fireSessionVariableChange(SessionVariables.QUEUED_READS);
1743
1744                 ListenerBase listener = getListenerBase(procedure);
1745
1746                 final ReadGraphImpl newGraph = ReadGraphImpl.create(getQueryProvider2());
1747
1748                 try {
1749
1750                     if (listener != null) {
1751
1752                         try {
1753                             QueryCacheBase.resultExternalReadEntry(newGraph, request, null, listener, procedure);
1754                         } catch (DatabaseException e) {
1755                             Logger.defaultLogError(e);
1756                         }
1757                         
1758                         
1759 //                        newGraph.processor.query(newGraph, request, null, new Procedure<T>() {
1760 //
1761 //                            @Override
1762 //                            public void exception(Throwable t) {
1763 //                                procedure.exception(t);
1764 //                                if(throwable != null) {
1765 //                                    throwable.set(t);
1766 //                                }
1767 //                            }
1768 //
1769 //                            @Override
1770 //                            public void execute(T t) {
1771 //                                if(result != null) result.set(t);
1772 //                                procedure.execute(t);
1773 //                            }
1774 //
1775 //                        }, listener);
1776
1777 //                        newGraph.waitAsync(request);
1778
1779                     } else {
1780
1781 //                        newGraph.state.barrier.inc();
1782
1783                         request.register(newGraph, new Listener<T>() {
1784
1785                             @Override
1786                             public void exception(Throwable t) {
1787                                 if(throwable != null) throwable.set(t);
1788                                 procedure.exception(t);
1789 //                                newGraph.state.barrier.dec();
1790                             }
1791
1792                             @Override
1793                             public void execute(T t) {
1794                                 if(result != null) result.set(t);
1795                                 procedure.execute(t);
1796 //                                newGraph.state.barrier.dec();
1797                             }
1798
1799                             @Override
1800                             public boolean isDisposed() {
1801                                 return true;
1802                             }
1803
1804                         });
1805
1806 //                        newGraph.waitAsync(request);
1807
1808                     }
1809
1810                 } finally {
1811
1812                     fireSessionVariableChange(SessionVariables.QUEUED_READS);
1813
1814                 }
1815
1816             }
1817
1818         });
1819
1820     }
1821
1822
1823     @Override
1824     public <T> void asyncRequest(final Read<T> request, final AsyncProcedure<T> procedure) {
1825
1826         scheduleRequest(request, procedure, null, null, null);
1827
1828     }
1829
1830     @Override
1831     public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1832         assertNotSession();
1833         Semaphore notify = new Semaphore(0);
1834         DataContainer<Throwable> container = new DataContainer<Throwable>();
1835         DataContainer<T> result = new DataContainer<T>();
1836         scheduleRequest(request, procedure, notify, container, result);
1837         acquire(notify, request);
1838         Throwable throwable = container.get();
1839         if(throwable != null) {
1840             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1841             else throw new DatabaseException("Unexpected exception", throwable);
1842         }
1843         return result.get();
1844     }
1845
1846     @Override
1847     public <T> T syncRequest(AsyncRead<T> request, final AsyncProcedure<T> procedure) throws DatabaseException {
1848         assertNotSession();
1849         Semaphore notify = new Semaphore(0);
1850         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1851         final DataContainer<T> resultContainer = new DataContainer<T>();
1852         scheduleRequest(request, new AsyncProcedure<T>() {
1853             @Override
1854             public void exception(AsyncReadGraph graph, Throwable throwable) {
1855                 exceptionContainer.set(throwable);
1856                 procedure.exception(graph, throwable);
1857             }
1858             @Override
1859             public void execute(AsyncReadGraph graph, T result) {
1860                 resultContainer.set(result);
1861                 procedure.execute(graph, result);
1862             }
1863         }, getListenerBase(procedure), notify);
1864         acquire(notify, request, procedure);
1865         Throwable throwable = exceptionContainer.get();
1866         if (throwable != null) {
1867             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1868             else throw new DatabaseException("Unexpected exception", throwable);
1869         }
1870         return resultContainer.get();
1871     }
1872
1873
1874     @Override
1875     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) throws DatabaseException {
1876         assertNotSession();
1877         Semaphore notify = new Semaphore(0);
1878         final DataContainer<Throwable> exceptionContainer = new DataContainer<Throwable>();
1879         scheduleRequest(request, new AsyncMultiProcedure<T>() {
1880             @Override
1881             public void exception(AsyncReadGraph graph, Throwable throwable) {
1882                 exceptionContainer.set(throwable);
1883                 procedure.exception(graph, throwable);
1884             }
1885             @Override
1886             public void execute(AsyncReadGraph graph, T result) {
1887                 procedure.execute(graph, result);
1888             }
1889             @Override
1890             public void finished(AsyncReadGraph graph) {
1891                 procedure.finished(graph);
1892             }
1893         }, notify);
1894         acquire(notify, request, procedure);
1895         Throwable throwable = exceptionContainer.get();
1896         if (throwable != null) {
1897             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1898             else throw new DatabaseException("Unexpected exception", throwable);
1899         }
1900         // TODO: implement return value
1901         System.err.println("TODO: implement return value for syncRequest(AsyncMultiRead, AsyncMultiProcedure)");
1902         return null;
1903     }
1904
1905     @Override
1906     public <T> T syncRequest(final ExternalRead<T> request) throws DatabaseException {
1907         return syncRequest(request, new ProcedureAdapter<T>());
1908
1909
1910 //        assert(request != null);
1911 //
1912 //        final DataContainer<T> result = new DataContainer<T>();
1913 //        final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1914 //
1915 //        syncRequest(request, new Procedure<T>() {
1916 //
1917 //            @Override
1918 //            public void execute(T t) {
1919 //                result.set(t);
1920 //            }
1921 //
1922 //            @Override
1923 //            public void exception(Throwable t) {
1924 //                exception.set(t);
1925 //            }
1926 //
1927 //        });
1928 //
1929 //        Throwable t = exception.get();
1930 //        if(t != null) {
1931 //            if(t instanceof DatabaseException) throw (DatabaseException)t;
1932 //            else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(PrimitiveRead)", t);
1933 //        }
1934 //
1935 //        return result.get();
1936
1937     }
1938
1939     @Override
1940     public <T> T syncRequest(ExternalRead<T> request, final Listener<T> procedure) throws DatabaseException {
1941         return syncRequest(request, (Procedure<T>)procedure);
1942     }
1943
1944
1945 //    @Override
1946 //    public <T> T syncRequest(Read<T> request, AsyncProcedure<T> procedure) throws DatabaseException {
1947 //        assertNotSession();
1948 //        Semaphore notify = new Semaphore(0);
1949 //        DataContainer<Throwable> container = new DataContainer<Throwable>();
1950 //        DataContainer<T> result = new DataContainer<T>();
1951 //        scheduleRequest(request, procedure, notify, container, result);
1952 //        acquire(notify, request);
1953 //        Throwable throwable = container.get();
1954 //        if(throwable != null) {
1955 //            if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1956 //            else throw new DatabaseException("Unexpected exception", throwable);
1957 //        }
1958 //        return result.get();
1959 //    }
1960
1961
1962     @Override
1963     public <T> T syncRequest(ExternalRead<T> request, final Procedure<T> procedure) throws DatabaseException {
1964         assertNotSession();
1965         Semaphore notify = new Semaphore(0);
1966         final DataContainer<Throwable> container = new DataContainer<Throwable>();
1967         final DataContainer<T> result = new DataContainer<T>();
1968         scheduleRequest(request, procedure, notify, container, result);
1969         acquire(notify, request);
1970         Throwable throwable = container.get();
1971         if (throwable != null) {
1972             if(throwable instanceof DatabaseException) throw (DatabaseException)throwable;
1973             else throw new DatabaseException("Unexpected exception", throwable);
1974         }
1975         return result.get();
1976     }
1977
1978     @Override
1979     public void syncRequest(Write request) throws DatabaseException  {
1980         assertNotSession();
1981         assertAlive();
1982         Semaphore notify = new Semaphore(0);
1983         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
1984         scheduleRequest(request, e -> exception.set(e), notify);
1985         acquire(notify, request);
1986         if(exception.get() != null) throw exception.get();
1987     }
1988
1989     @Override
1990     public <T> T syncRequest(WriteResult<T> request) throws DatabaseException  {
1991         assertNotSession();
1992         Semaphore notify = new Semaphore(0);
1993         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
1994         final DataContainer<T> result = new DataContainer<T>();
1995         scheduleRequest(request, new Procedure<T>() {
1996
1997             @Override
1998             public void exception(Throwable t) {
1999               exception.set(t);
2000             }
2001
2002             @Override
2003             public void execute(T t) {
2004               result.set(t);
2005             }
2006
2007         }, notify);
2008         acquire(notify, request);
2009         if(exception.get() != null) {
2010             Throwable t = exception.get();
2011             if(t instanceof DatabaseException) throw (DatabaseException)t;
2012             else throw new DatabaseException(t);
2013         }
2014         return result.get();
2015     }
2016
2017     @Override
2018     public <T> T syncRequest(WriteOnlyResult<T> request) throws DatabaseException  {
2019         assertNotSession();
2020         Semaphore notify = new Semaphore(0);
2021         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2022         final DataContainer<T> result = new DataContainer<T>();
2023         scheduleRequest(request, new Procedure<T>() {
2024
2025             @Override
2026             public void exception(Throwable t) {
2027               exception.set(t);
2028             }
2029
2030             @Override
2031             public void execute(T t) {
2032               result.set(t);
2033             }
2034
2035         }, notify);
2036         acquire(notify, request);
2037         if(exception.get() != null) {
2038             Throwable t = exception.get();
2039             if(t instanceof DatabaseException) throw (DatabaseException)t;
2040             else throw new DatabaseException(t);
2041         }
2042         return result.get();
2043     }
2044
2045     @Override
2046     public <T> T syncRequest(DelayedWriteResult<T> request) throws DatabaseException  {
2047         assertNotSession();
2048         Semaphore notify = new Semaphore(0);
2049         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2050         final DataContainer<T> result = new DataContainer<T>();
2051         scheduleRequest(request, new Procedure<T>() {
2052
2053             @Override
2054             public void exception(Throwable t) {
2055               exception.set(t);
2056             }
2057
2058             @Override
2059             public void execute(T t) {
2060               result.set(t);
2061             }
2062
2063         }, notify);
2064         acquire(notify, request);
2065         if(exception.get() != null) {
2066             Throwable t = exception.get();
2067             if(t instanceof DatabaseException) throw (DatabaseException)t;
2068             else throw new DatabaseException(t);
2069         }
2070         return result.get();
2071     }
2072
2073     @Override
2074     public void syncRequest(DelayedWrite request) throws DatabaseException  {
2075         assertNotSession();
2076         Semaphore notify = new Semaphore(0);
2077         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2078         scheduleRequest(request, e -> exception.set(e), notify);
2079         acquire(notify, request);
2080         if(exception.get() != null) throw exception.get();
2081     }
2082
2083     @Override
2084     public void syncRequest(WriteOnly request) throws DatabaseException  {
2085         assertNotSession();
2086         assertAlive();
2087         Semaphore notify = new Semaphore(0);
2088         final DataContainer<DatabaseException> exception = new DataContainer<DatabaseException>();
2089         scheduleRequest(request, e -> exception.set(e), notify);
2090         acquire(notify, request);
2091         if(exception.get() != null) throw exception.get();
2092     }
2093
2094     /*
2095      *
2096      * GraphRequestProcessor interface
2097      */
2098
2099     @Override
2100     public <T> void asyncRequest(final AsyncRead<T> request, final AsyncProcedure<T> procedure) {
2101
2102         scheduleRequest(request, procedure, null, null);
2103
2104     }
2105
2106     @Override
2107     public <T> void asyncRequest(final AsyncMultiRead<T> request, final AsyncMultiProcedure<T> procedure) {
2108
2109         scheduleRequest(request, procedure, null);
2110
2111     }
2112
2113     @Override
2114     public <T> void asyncRequest(final ExternalRead<T> request, final Procedure<T> procedure) {
2115
2116         scheduleRequest(request, procedure, null, null, null);
2117
2118     }
2119
2120     @Override
2121     public void asyncRequest(final Write request, final Consumer<DatabaseException> callback) {
2122
2123         scheduleRequest(request, callback, null);
2124
2125     }
2126
2127     @Override
2128     public <T> void asyncRequest(final WriteResult<T> request, final Procedure<T> procedure) {
2129
2130         scheduleRequest(request, procedure, null);
2131
2132     }
2133
2134     @Override
2135     public <T> void asyncRequest(final WriteOnlyResult<T> request, final Procedure<T> procedure) {
2136
2137         scheduleRequest(request, procedure, null);
2138
2139     }
2140
2141     @Override
2142     public <T> void asyncRequest(final DelayedWriteResult<T> request, final Procedure<T> procedure) {
2143
2144         scheduleRequest(request, procedure, null);
2145
2146     }
2147
2148     @Override
2149     public void asyncRequest(final DelayedWrite request, final Consumer<DatabaseException> callback) {
2150
2151         scheduleRequest(request, callback, null);
2152
2153     }
2154
2155     @Override
2156     public void asyncRequest(final Write r) {
2157         asyncRequest(r, null);
2158     }
2159
2160     @Override
2161     public void asyncRequest(final DelayedWrite r) {
2162         asyncRequest(r, null);
2163     }
2164
2165     @Override
2166     public void asyncRequest(final WriteOnly request, final Consumer<DatabaseException> callback) {
2167
2168         scheduleRequest(request, callback, null);
2169
2170     }
2171
2172     @Override
2173     public void asyncRequest(final WriteOnly request) {
2174
2175         asyncRequest(request, null);
2176
2177     }
2178
2179     @Override
2180     public <T> void async(ReadInterface<T> r, AsyncProcedure<T> procedure) {
2181         r.request(this, procedure);
2182     }
2183
2184     @Override
2185     public <T> void async(ReadInterface<T> r, Procedure<T> procedure) {
2186         r.request(this, procedure);
2187     }
2188
2189     @Override
2190     public <T> void async(ReadInterface<T> r, SyncProcedure<T> procedure) {
2191         r.request(this, procedure);
2192     }
2193
2194     @Override
2195     public <T> void async(ReadInterface<T> r, AsyncListener<T> procedure) {
2196         r.request(this, procedure);
2197     }
2198
2199     @Override
2200     public <T> void async(ReadInterface<T> r, Listener<T> procedure) {
2201         r.request(this, procedure);
2202     }
2203
2204     @Override
2205     public <T> void async(ReadInterface<T> r, SyncListener<T> procedure) {
2206         r.request(this, procedure);
2207     }
2208
2209     @Override
2210     public <T> T sync(ReadInterface<T> r) throws DatabaseException {
2211         return r.request(this);
2212     }
2213
2214     @Override
2215     public <T> T sync(WriteInterface<T> r) throws DatabaseException {
2216         return r.request(this);
2217     }
2218
2219     @Override
2220     public <T> void async(WriteInterface<T> r, Procedure<T> procedure) {
2221         r.request(this, procedure);
2222     }
2223
2224     @Override
2225     public <T> void async(WriteInterface<T> r) {
2226         r.request(this, new ProcedureAdapter<T>());
2227     }
2228
2229     //@Override
2230     public void incAsync() {
2231         state.incAsync();
2232     }
2233
2234     //@Override
2235     public void decAsync() {
2236         state.decAsync();
2237     }
2238
2239     public long getCluster(ResourceImpl resource) {
2240         ClusterI cluster = clusterTable.getClusterByResourceKey(resource.id);
2241         return cluster.getClusterId();
2242     }
2243
2244     public long getCluster(int id) {
2245         if (clusterTable == null)
2246             System.out.println("SessionImplSocket.getCluster() clusterTable == null !!!!! how come");
2247         return clusterTable.getClusterIdByResourceKeyNoThrow(id);
2248     }
2249
2250     public ResourceImpl getResource(int id) {
2251         return new ResourceImpl(resourceSupport, id);
2252     }
2253
2254     public ResourceImpl getResource(int resourceIndex, long clusterId) {
2255         assert (!ClusterTraitsBase.isIllegalResourceIndex(resourceIndex));
2256         ClusterI proxy = clusterTable.getClusterByClusterId(clusterId);
2257         int key = proxy.getClusterKey();
2258         int resourceKey = ClusterTraitsBase.createResourceKeyNoThrow(key, resourceIndex);
2259         return new ResourceImpl(resourceSupport, resourceKey);
2260     }
2261
2262     public ResourceImpl getResource2(int id) {
2263         assert (id != 0);
2264         return new ResourceImpl(resourceSupport, id);
2265     }
2266
2267     final public int getId(ResourceImpl impl) {
2268         return impl.id;
2269     }
2270
2271     public static final Charset UTF8 = Charset.forName("utf-8");
2272
2273
2274
2275
2276     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject) {
2277         for(TransientGraph g : support.providers) {
2278             if(g.isPending(subject)) return false;
2279         }
2280         return true;
2281     }
2282
2283     static boolean areVirtualStatementsLoaded(final VirtualGraphServerSupportImpl support, int subject, int predicate) {
2284         for(TransientGraph g : support.providers) {
2285             if(g.isPending(subject, predicate)) return false;
2286         }
2287         return true;
2288     }
2289
2290     static void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, final Consumer<ReadGraphImpl> runnable) {
2291
2292         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2293
2294             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2295
2296             @Override
2297             public void accept(ReadGraphImpl graph) {
2298                 if(ready.decrementAndGet() == 0) {
2299                     runnable.accept(graph);
2300                 }
2301             }
2302
2303         };
2304
2305         for(TransientGraph g : support.providers) {
2306             if(g.isPending(subject)) {
2307                 try {
2308                     g.load(graph, subject, composite);
2309                 } catch (DatabaseException e) {
2310                     e.printStackTrace();
2311                 }
2312             } else {
2313                 composite.accept(graph);
2314             }
2315         }
2316
2317         composite.accept(graph);
2318
2319     }
2320
2321     static  void loadVirtualStatements(final VirtualGraphServerSupportImpl support, ReadGraphImpl graph, int subject, int predicate, final Consumer<ReadGraphImpl> runnable) {
2322
2323         Consumer<ReadGraphImpl> composite = new Consumer<ReadGraphImpl>() {
2324
2325             AtomicInteger ready = new AtomicInteger(support.providers.size() + 1);
2326
2327             @Override
2328             public void accept(ReadGraphImpl graph) {
2329                 if(ready.decrementAndGet() == 0) {
2330                     runnable.accept(graph);
2331                 }
2332             }
2333
2334         };
2335
2336         for(TransientGraph g : support.providers) {
2337             if(g.isPending(subject, predicate)) {
2338                 try {
2339                     g.load(graph, subject, predicate, composite);
2340                 } catch (DatabaseException e) {
2341                     e.printStackTrace();
2342                 }
2343             } else {
2344                 composite.accept(graph);
2345             }
2346         }
2347
2348         composite.accept(graph);
2349
2350     }
2351
2352 //    void dumpHeap() {
2353 //
2354 //        try {
2355 //            ManagementFactory.newPlatformMXBeanProxy(ManagementFactory.getPlatformMBeanServer(),
2356 //                    "com.sun.management:type=HotSpotDiagnostic", HotSpotDiagnosticMXBean.class).dumpHeap(
2357 //                    "d:/heap" + flushCounter + ".txt", true);
2358 //        } catch (IOException e) {
2359 //            e.printStackTrace();
2360 //        }
2361 //
2362 //    }
2363
2364     void fireReactionsToSynchronize(ChangeSet cs) {
2365
2366         // Do not fire empty events
2367         if (cs.isEmpty())
2368             return;
2369
2370         ReadGraphImpl g = ReadGraphImpl.create(getQueryProvider2());
2371 //        g.state.barrier.inc();
2372
2373         try {
2374
2375             if (!cs.isEmpty()) {
2376                 ChangeEvent e2 = new ChangeEvent(g.getSession(), g, null, cs);
2377                 for (ChangeListener l : changeListeners2) {
2378                     try {
2379                         l.graphChanged(e2);
2380                     } catch (Exception ex) {
2381                         ex.printStackTrace();
2382                     }
2383                 }
2384             }
2385
2386         } finally {
2387
2388 //            g.state.barrier.dec();
2389 //            g.waitAsync(null);
2390
2391         }
2392
2393     }
2394
2395     void fireReactionsToCommit(ReadGraphImpl graph, ChangeSet cs2) {
2396         try {
2397
2398             // Do not fire empty events
2399             if (cs2.isEmpty())
2400                 return;
2401
2402 //            graph.restart();
2403 //            graph.state.barrier.inc();
2404
2405             try {
2406
2407                 if (!cs2.isEmpty()) {
2408
2409                     ChangeEvent e2 = new ChangeEvent(graph.getSession(), graph, null, cs2);
2410                     for (ChangeListener l : changeListeners2) {
2411                         try {
2412                             l.graphChanged(e2);
2413 //                            System.out.println("changelistener " + l);
2414                         } catch (Exception ex) {
2415                             ex.printStackTrace();
2416                         }
2417                     }
2418
2419                 }
2420
2421             } finally {
2422
2423 //                graph.state.barrier.dec();
2424 //                graph.waitAsync(null);
2425
2426             }
2427         } catch (Throwable t) {
2428             t.printStackTrace();
2429
2430         }
2431     }
2432     void fireMetadataListeners(WriteGraphImpl graph, ChangeSet cs2) {
2433
2434         try {
2435
2436             // Do not fire empty events
2437             if (cs2.isEmpty())
2438                 return;
2439
2440             // Do not fire on virtual requests
2441             if(graph.getProvider() != null)
2442                 return;
2443
2444             WriteGraphImpl reactionGraph = WriteGraphImpl.create(graph.processor, writeSupport, null);
2445
2446             try {
2447
2448                 ChangeEvent e2 = new ChangeEvent(graph.getSession(), reactionGraph, graph, cs2);
2449                 for (ChangeListener l : metadataListeners) {
2450                     try {
2451                         l.graphChanged(e2);
2452                     } catch (Throwable ex) {
2453                         ex.printStackTrace();
2454                     }
2455                 }
2456
2457             } finally {
2458
2459             }
2460
2461         } catch (Throwable t) {
2462             t.printStackTrace();
2463         }
2464
2465     }
2466
2467
2468     /*
2469      * (non-Javadoc)
2470      *
2471      * @see org.simantics.db.ServiceLocator#getService(java.lang.Class)
2472      */
2473     @Override
2474     public <T> T getService(Class<T> api) {
2475         T t = peekService(api);
2476         if (t == null) {
2477             if (state.isClosed())
2478                 throw new ServiceNotFoundException(this, api, "Session has been shut down");
2479             throw new ServiceNotFoundException(this, api);
2480         }
2481         return t;
2482     }
2483
2484     /**
2485      * @return
2486      */
2487     protected abstract ServerInformation getCachedServerInformation();
2488
2489         private Class<?> serviceKey1 = null;
2490         private Class<?> serviceKey2 = null;
2491         private Object service1 = null;
2492         private Object service2 = null;
2493
2494     /*
2495      * (non-Javadoc)
2496      *
2497      * @see
2498      * fi.vtt.simantics.db.connection.ServiceLocator#peekService(java.lang.Class)
2499      */
2500     @SuppressWarnings("unchecked")
2501     @Override
2502     public synchronized <T> T peekService(Class<T> api) {
2503
2504                 if(serviceKey1 == api) {
2505                         return (T)service1;
2506                 } else if (serviceKey2 == api) {
2507                         // Promote this key
2508                         Object result = service2;
2509                         service2 = service1;
2510                         serviceKey2 = serviceKey1;
2511                         service1 = result;
2512                         serviceKey1 = api;
2513                         return (T)result;
2514                 }
2515
2516         if (Layer0.class == api)
2517                 return (T) L0;
2518         if (ServerInformation.class == api)
2519             return (T) getCachedServerInformation();
2520         else if (WriteGraphImpl.class == api)
2521             return (T) writeState.getGraph();
2522         else if (ClusterBuilder.class == api)
2523             return (T)new ClusterBuilderImpl(this, (WriteOnlySupport)writeState.getGraph().writeSupport);
2524         else if (ClusterBuilderFactory.class == api)
2525             return (T)new ClusterBuilderFactoryImpl(this);
2526
2527                 service2 = service1;
2528                 serviceKey2 = serviceKey1;
2529
2530         service1 = serviceLocator.peekService(api);
2531         serviceKey1 = api;
2532
2533         return (T)service1;
2534
2535     }
2536
2537     /*
2538      * (non-Javadoc)
2539      *
2540      * @see
2541      * fi.vtt.simantics.db.connection.ServiceLocator#hasService(java.lang.Class)
2542      */
2543     @Override
2544     public boolean hasService(Class<?> api) {
2545         return serviceLocator.hasService(api);
2546     }
2547
2548     /**
2549      * @param api the api that must be implemented by the specified service
2550      * @param service the service implementation
2551      */
2552     @Override
2553     public <T> void registerService(Class<T> api, T service) {
2554         if(Layer0.class == api) {
2555                 L0 = (Layer0)service;
2556                 return;
2557         }
2558         serviceLocator.registerService(api, service);
2559         if (TransactionPolicySupport.class == api) {
2560             transactionPolicy = (TransactionPolicySupport)service;
2561             state.resetTransactionPolicy();
2562         }
2563         if (api == serviceKey1)
2564                 service1 = service;
2565         else if (api == serviceKey2)
2566                 service2 = service;
2567     }
2568
2569     // ----------------
2570     // SessionMonitor
2571     // ----------------
2572
2573     void fireSessionVariableChange(String variable) {
2574         for (MonitorHandler h : monitorHandlers) {
2575             MonitorContext ctx = monitorContexts.getLeft(h);
2576             assert ctx != null;
2577             // SafeRunner functionality repeated here to avoid dependency.
2578             try {
2579                 h.valuesChanged(ctx);
2580             } catch (Exception e) {
2581                 Logger.defaultLogError("monitor handler notification produced the following exception", e);
2582             } catch (LinkageError e) {
2583                 Logger.defaultLogError("monitor handler notification produced a linkage error", e);
2584             }
2585         }
2586     }
2587
2588
2589     class ResourceSerializerImpl implements ResourceSerializer {
2590
2591         public long createRandomAccessId(int id) throws DatabaseException {
2592             if(id < 0)
2593                 return id;
2594             int index = ClusterTraitsBase.getResourceIndexFromResourceKey(id);
2595             long cluster = getCluster(id);
2596             if (0 == cluster)
2597                 return 0; // Better to return 0 then invalid id.
2598             long result = ClusterTraitsBase.createResourceId(cluster, index);
2599             return result;
2600         }
2601
2602         @Override
2603         public long getRandomAccessId(Resource resource) throws DatabaseException {
2604
2605             ResourceImpl resourceImpl = (ResourceImpl) resource;
2606             return createRandomAccessId(resourceImpl.id);
2607
2608         }
2609
2610         @Override
2611         public int getTransientId(Resource resource) throws DatabaseException {
2612
2613             ResourceImpl resourceImpl = (ResourceImpl) resource;
2614             return resourceImpl.id;
2615
2616         }
2617
2618         @Override
2619         public String createRandomAccessId(Resource resource)
2620         throws InvalidResourceReferenceException {
2621
2622             if(resource == null) throw new IllegalArgumentException();
2623
2624             ResourceImpl resourceImpl = (ResourceImpl) resource;
2625
2626             if (resourceImpl.id < 0) return String.valueOf(resourceImpl.id) + "_0";
2627
2628             int r;
2629             try {
2630                 r = ClusterTraits.getResourceIndexFromResourceKey(resourceImpl.id);
2631             } catch (DatabaseException e1) {
2632                 throw new InvalidResourceReferenceException(e1);
2633             }
2634             try {
2635                 // Serialize as '<resource index>_<cluster id>'
2636                 return "" + r + "_" + getCluster(resourceImpl);
2637             } catch (Throwable e) {
2638                 e.printStackTrace();
2639                 throw new InvalidResourceReferenceException(e);
2640             } finally {
2641             }
2642         }
2643
2644         public int getTransientId(long serialized) throws DatabaseException {
2645             if (serialized <= 0)
2646                 return (int)serialized;
2647             int index = ClusterTraitsBase.getResourceIndexFromResourceId(serialized);
2648             long c = ClusterTraitsBase.getClusterIdFromResourceId(serialized);
2649             ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2650             if (cluster == null)
2651                 throw new DatabaseException("Couldn't load cluster for id " + c + " for resource id " + serialized);
2652             int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), index);
2653             return key;
2654         }
2655
2656         @Override
2657         public Resource getResource(long randomAccessId) throws DatabaseException {
2658             return getResourceByKey(getTransientId(randomAccessId));
2659         }
2660
2661         @Override
2662         public Resource getResource(int transientId) throws DatabaseException {
2663             return getResourceByKey(transientId);
2664         }
2665
2666         @Override
2667         public Resource getResource(String randomAccessId)
2668         throws InvalidResourceReferenceException {
2669             try {
2670                 int i = randomAccessId.indexOf('_');
2671                 if (i == -1)
2672                     throw new InvalidResourceReferenceException("Could not parse resource id + cluster id from '"
2673                             + randomAccessId + "'");
2674                 int r = Integer.parseInt(randomAccessId.substring(0, i));
2675                 if(r < 0) return getResourceByKey(r);
2676                 long c = Long.parseLong(randomAccessId.substring(i + 1));
2677                 ClusterI cluster = clusterTranslator.getClusterByClusterId(c);
2678                 int key = ClusterTraits.createResourceKey(cluster.getClusterKey(), r);
2679                 if (cluster.hasResource(key, clusterTranslator))
2680                     return getResourceByKey(key);
2681             } catch (InvalidResourceReferenceException e) {
2682                 throw e;
2683             } catch (NumberFormatException e) {
2684                 throw new InvalidResourceReferenceException(e);
2685             } catch (Throwable e) {
2686                 e.printStackTrace();
2687                 throw new InvalidResourceReferenceException(e);
2688             } finally {
2689             }
2690             throw new InvalidResourceReferenceException("Resource does not exist " + randomAccessId);
2691         }
2692
2693         @Override
2694         public boolean disposeRandomAccessId(String randomAccessId) throws InvalidResourceReferenceException {
2695             try {
2696                 return true;
2697             } catch (Throwable e) {
2698                 e.printStackTrace();
2699                 throw new InvalidResourceReferenceException(e);
2700             } finally {
2701             }
2702         }
2703     }
2704
2705     // Copied from old SessionImpl
2706
2707     void check() {
2708         if (state.isClosed())
2709             throw new Error("Session closed.");
2710     }
2711
2712
2713
2714     public ResourceImpl getNewResource(long clusterId) {
2715         ClusterI cluster = clusterTable.getClusterByClusterIdOrThrow(clusterId);
2716         int newId;
2717         try {
2718             newId = cluster.createResource(clusterTranslator);
2719         } catch (DatabaseException e) {
2720             Logger.defaultLogError(e);
2721             return null;
2722         }
2723         return new ResourceImpl(resourceSupport, newId);
2724     }
2725
2726     public ResourceImpl getNewResource(Resource clusterSet)
2727     throws DatabaseException {
2728         long resourceId = clusterSet.getResourceId();
2729         Long clusterId = clusterSetsSupport.get(resourceId);
2730         if (null == clusterId)
2731             throw new ClusterSetExistException("Cluster set does not exist. Resource=" + clusterSet);
2732         if (Constants.NewClusterId == clusterId) {
2733             clusterId = getService(ClusteringSupport.class).createCluster();
2734             if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2735                 createdClusters.add(clusterId);
2736             clusterSetsSupport.put(resourceId, clusterId);
2737             return getNewResource(clusterId);
2738         } else {
2739             ClusterBase cb = clusterTranslator.getClusterByClusterId(clusterId);
2740             ResourceImpl result;
2741             if (cb.getNumberOfResources(clusterTranslator) >= ClusterTable.CLUSTER_FILL_SIZE) {
2742                 clusterId = getService(ClusteringSupport.class).createCluster();
2743                 if ((serviceMode & SERVICE_MODE_CREATE) > 0)
2744                     createdClusters.add(clusterId);
2745                 clusterSetsSupport.put(resourceId, clusterId);
2746                 return getNewResource(clusterId);
2747             } else {
2748                 result = getNewResource(clusterId);
2749                 int resultKey = querySupport.getId(result);
2750                 long resultCluster = querySupport.getClusterId(resultKey);
2751                 if (clusterId != resultCluster)
2752                     clusterSetsSupport.put(resourceId, resultCluster);
2753                 return result;
2754             }
2755         }
2756     }
2757
2758     public void getNewClusterSet(Resource clusterSet)
2759     throws DatabaseException {
2760         if (DEBUG)
2761             System.out.println("new cluster set=" + clusterSet);
2762         long resourceId = clusterSet.getResourceId();
2763         if (clusterSetsSupport.containsKey(resourceId))
2764             throw new ClusterSetExistException("Cluster set exist already. Resource=" + clusterSet);
2765         clusterSetsSupport.put(resourceId, Constants.NewClusterId);
2766     }
2767     public boolean containsClusterSet(Resource clusterSet)
2768     throws ServiceException {
2769         long resourceId = clusterSet.getResourceId();
2770         return clusterSetsSupport.containsKey(resourceId);
2771     }
2772     public Resource setDefaultClusterSet4NewResource(Resource clusterSet) {
2773         Resource r = defaultClusterSet;
2774         defaultClusterSet = clusterSet;
2775         return r;
2776     }
2777     void printDiagnostics() {
2778
2779         if (DIAGNOSTICS) {
2780
2781             final DataContainer<Long> totalMem = new DataContainer<Long>(0L);
2782             final DataContainer<Long> residentClusters = new DataContainer<Long>(0L);
2783             for(ClusterI cluster : clusterTable.getClusters()) {
2784                 try {
2785                     if (cluster.isLoaded() && !cluster.isEmpty()) {
2786                         residentClusters.set(residentClusters.get() + 1);
2787                         totalMem.set(totalMem.get() + cluster.getUsedSpace());
2788                     }
2789                 } catch (DatabaseException e) {
2790                     Logger.defaultLogError(e);
2791                 }
2792             }
2793
2794             System.out.println("--------------------------------");
2795             System.out.println("Cluster information:");
2796             System.out.println("-amount of resident clusters=" + residentClusters.get());
2797             System.out.println("-total memory usage=" + totalMem.get() / 1024 + "kB");
2798
2799             for(ClusterI cluster : clusterTable.getClusters()) {
2800                 System.out.print("Cluster " + cluster.getClusterId() + " [");
2801                 try {
2802                     if (!cluster.isLoaded())
2803                         System.out.println("not loaded]");
2804                     else if (cluster.isEmpty())
2805                         System.out.println("is empty]");
2806                     else {
2807                         System.out.print("resources=" + cluster.getNumberOfResources(clusterTranslator));
2808                         System.out.print(",references=" + cluster.getReferenceCount());
2809                         System.out.println(",memory=" + cluster.getUsedSpace() / 1024 + "kB]");
2810                     }
2811                 } catch (DatabaseException e) {
2812                     Logger.defaultLogError(e);
2813                     System.out.println("is corrupted]");
2814                 }
2815             }
2816
2817             queryProvider2.printDiagnostics();
2818             System.out.println("--------------------------------");
2819         }
2820
2821     }
2822
2823     public void fireStartReadTransaction() {
2824
2825         if (DIAGNOSTICS)
2826             System.out.println("StartReadTransaction");
2827
2828         for (SessionEventListener listener : eventListeners) {
2829             try {
2830                 listener.readTransactionStarted();
2831             } catch (Throwable t) {
2832                 t.printStackTrace();
2833             }
2834         }
2835
2836     }
2837
2838     public void fireFinishReadTransaction() {
2839
2840         if (DIAGNOSTICS)
2841             System.out.println("FinishReadTransaction");
2842
2843         for (SessionEventListener listener : eventListeners) {
2844             try {
2845                 listener.readTransactionFinished();
2846             } catch (Throwable t) {
2847                 t.printStackTrace();
2848             }
2849         }
2850
2851     }
2852
2853     public void fireStartWriteTransaction() {
2854
2855         if (DIAGNOSTICS)
2856             System.out.println("StartWriteTransaction");
2857
2858         for (SessionEventListener listener : eventListeners) {
2859             try {
2860                 listener.writeTransactionStarted();
2861             } catch (Throwable t) {
2862                 t.printStackTrace();
2863             }
2864         }
2865
2866     }
2867
2868     public void fireFinishWriteTransaction() {
2869
2870         if (DIAGNOSTICS)
2871             System.out.println("FinishWriteTransaction");
2872
2873         for (SessionEventListener listener : eventListeners) {
2874             try {
2875                 listener.writeTransactionFinished();
2876             } catch (Throwable t) {
2877                 t.printStackTrace();
2878             }
2879         }
2880
2881         Indexing.resetDependenciesIndexingDisabled();
2882
2883     }
2884
2885     Resource deserialize(final long resourceId, final long clusterId) throws IOException {
2886         throw new Error("Not supported at the moment.");
2887     }
2888
2889     State getState() {
2890         return state;
2891     }
2892
2893     ClusterTable getClusterTable() {
2894         return clusterTable;
2895     }
2896
2897     public GraphSession getGraphSession() {
2898         return graphSession;
2899     }
2900
2901     public QueryProcessor getQueryProvider2() {
2902         return queryProvider2;
2903     }
2904
2905     ClientChangesImpl getClientChanges() {
2906         return clientChanges;
2907     }
2908
2909     boolean getWriteOnly() {
2910         return writeOnly;
2911     }
2912
2913     static int counter = 0;
2914
2915     public void onClusterLoaded(long clusterId) {
2916
2917         clusterTable.updateSize();
2918
2919     }
2920
2921
2922
2923
2924     /*
2925      * Implementation of the interface RequestProcessor
2926      */
2927
2928     @Override
2929     public <T> T syncRequest(final Read<T> request) throws DatabaseException {
2930
2931         assertNotSession();
2932         assertAlive();
2933
2934         assert(request != null);
2935
2936         final DataContainer<T> result = new DataContainer<T>();
2937         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
2938
2939         syncRequest(request, new AsyncProcedure<T>() {
2940
2941             @Override
2942             public void execute(AsyncReadGraph graph, T t) {
2943                 result.set(t);
2944             }
2945
2946             @Override
2947             public void exception(AsyncReadGraph graph, Throwable t) {
2948                 exception.set(t);
2949             }
2950
2951         });
2952
2953         Throwable t = exception.get();
2954         if(t != null) {
2955             if(t instanceof DatabaseException) throw (DatabaseException)t;
2956             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
2957         }
2958
2959         return result.get();
2960
2961     }
2962
2963 //    @Override
2964 //    public <T> T syncRequest(Read<T> request, AsyncListener<T> procedure) throws DatabaseException {
2965 //        assertNotSession();
2966 //        return syncRequest(request, (AsyncProcedure<T>)procedure);
2967 //    }
2968
2969     @Override
2970     public <T> T syncRequest(Read<T> request, SyncListener<T> procedure) throws DatabaseException {
2971         assertNotSession();
2972         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
2973     }
2974
2975     @Override
2976     public <T> T syncRequest(Read<T> request, final Listener<T> procedure) throws DatabaseException {
2977         assertNotSession();
2978         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
2979     }
2980
2981     @Override
2982     public <T> T syncRequest(final Read<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
2983         assertNotSession();
2984         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
2985     }
2986
2987     @Override
2988     public <T> T syncRequest(Read<T> request, Procedure<T> procedure) throws DatabaseException {
2989         assertNotSession();
2990         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
2991     }
2992
2993     @Override
2994     public <T> T syncRequest(final AsyncRead<T> request) throws DatabaseException {
2995
2996         assertNotSession();
2997
2998         assert(request != null);
2999
3000         final DataContainer<T> result = new DataContainer<T>();
3001         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3002
3003         syncRequest(request, new AsyncProcedure<T>() {
3004
3005             @Override
3006             public void execute(AsyncReadGraph graph, T t) {
3007                 result.set(t);
3008             }
3009
3010             @Override
3011             public void exception(AsyncReadGraph graph, Throwable t) {
3012                 exception.set(t);
3013             }
3014
3015         });
3016
3017         Throwable t = exception.get();
3018         if(t != null) {
3019             if(t instanceof DatabaseException) throw (DatabaseException)t;
3020             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3021         }
3022
3023         return result.get();
3024
3025     }
3026
3027     @Override
3028     public <T> T syncRequest(AsyncRead<T> request, AsyncListener<T> procedure) throws DatabaseException {
3029         assertNotSession();
3030         return syncRequest(request, (AsyncProcedure<T>)procedure);
3031     }
3032
3033     @Override
3034     public <T> T syncRequest(AsyncRead<T> request, SyncListener<T> procedure) throws DatabaseException {
3035         assertNotSession();
3036         return syncRequest(request, new SyncToAsyncListener<T>(procedure));
3037     }
3038
3039     @Override
3040     public <T> T syncRequest(AsyncRead<T> request, Listener<T> procedure) throws DatabaseException {
3041         assertNotSession();
3042         return syncRequest(request, new NoneToAsyncListener<T>(procedure));
3043     }
3044
3045     @Override
3046     public <T> T syncRequest(AsyncRead<T> request, final SyncProcedure<T> procedure) throws DatabaseException {
3047         assertNotSession();
3048         return syncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3049     }
3050
3051     @Override
3052     final public <T> T syncRequest(final AsyncRead<T> request, final Procedure<T> procedure) throws DatabaseException {
3053         assertNotSession();
3054         return syncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3055     }
3056
3057     @Override
3058     public <T> Collection<T> syncRequest(MultiRead<T> request) throws DatabaseException {
3059
3060         assertNotSession();
3061
3062         assert(request != null);
3063
3064         final ArrayList<T> result = new ArrayList<T>();
3065         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3066
3067         syncRequest(request, new AsyncMultiProcedure<T>() {
3068
3069             @Override
3070             public void execute(AsyncReadGraph graph, T t) {
3071                 synchronized(result) {
3072                     result.add(t);
3073                 }
3074             }
3075
3076             @Override
3077             public void finished(AsyncReadGraph graph) {
3078             }
3079
3080             @Override
3081             public void exception(AsyncReadGraph graph, Throwable t) {
3082                 exception.set(t);
3083             }
3084
3085
3086         });
3087
3088         Throwable t = exception.get();
3089         if(t != null) {
3090             if(t instanceof DatabaseException) throw (DatabaseException)t;
3091             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3092         }
3093
3094         return result;
3095
3096     }
3097
3098     @Override
3099     public <T> Collection<T> syncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3100         assertNotSession();
3101         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3102     }
3103
3104     @Override
3105     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3106         assertNotSession();
3107         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3108     }
3109
3110     @Override
3111     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3112         assertNotSession();
3113         return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3114     }
3115
3116     @Override
3117     public <T> Collection<T> syncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) throws DatabaseException {
3118         assertNotSession();
3119         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3120     }
3121
3122     @Override
3123     public <T> Collection<T> syncRequest(MultiRead<T> request, MultiProcedure<T> procedure) throws DatabaseException {
3124         assertNotSession();
3125         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3126     }
3127
3128     @Override
3129     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request) throws DatabaseException {
3130
3131         assertNotSession();
3132
3133         assert(request != null);
3134
3135         final ArrayList<T> result = new ArrayList<T>();
3136         final DataContainer<Throwable> exception = new DataContainer<Throwable>();
3137
3138         syncRequest(request, new AsyncMultiProcedure<T>() {
3139
3140             @Override
3141             public void execute(AsyncReadGraph graph, T t) {
3142                 synchronized(result) {
3143                     result.add(t);
3144                 }
3145             }
3146
3147             @Override
3148             public void finished(AsyncReadGraph graph) {
3149             }
3150
3151             @Override
3152             public void exception(AsyncReadGraph graph, Throwable t) {
3153                 exception.set(t);
3154             }
3155
3156
3157         });
3158
3159         Throwable t = exception.get();
3160         if(t != null) {
3161             if(t instanceof DatabaseException) throw (DatabaseException)t;
3162             else throw new DatabaseException("Unexpected exception in ReadGraph.syncRequest(Read)", t);
3163         }
3164
3165         return result;
3166
3167     }
3168
3169     @Override
3170     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) throws DatabaseException {
3171         assertNotSession();
3172         return syncRequest(request, (AsyncMultiProcedure<T>)procedure);
3173     }
3174
3175     @Override
3176     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) throws DatabaseException {
3177         assertNotSession();
3178         return syncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3179     }
3180
3181     @Override
3182     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) throws DatabaseException {
3183         assertNotSession();
3184        return syncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3185     }
3186
3187     @Override
3188     public <T> Collection<T> syncRequest(AsyncMultiRead<T> request, final SyncMultiProcedure<T> procedure) throws DatabaseException {
3189         assertNotSession();
3190         return syncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3191     }
3192
3193     @Override
3194     final public <T> Collection<T> syncRequest(final AsyncMultiRead<T> request, final MultiProcedure<T> procedure) throws DatabaseException {
3195         assertNotSession();
3196         return syncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3197     }
3198
3199
3200     @Override
3201     public <T> void asyncRequest(Read<T> request) {
3202
3203         asyncRequest(request, new ProcedureAdapter<T>() {
3204             @Override
3205             public void exception(Throwable t) {
3206                 t.printStackTrace();
3207             }
3208         });
3209
3210     }
3211
3212     @Override
3213     public <T> void asyncRequest(Read<T> request, AsyncListener<T> procedure) {
3214         asyncRequest(request, (AsyncProcedure<T>)procedure);
3215     }
3216
3217     @Override
3218     public <T> void asyncRequest(Read<T> request, final SyncListener<T> procedure) {
3219         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3220     }
3221
3222     @Override
3223     public <T> void asyncRequest(Read<T> request, final Listener<T> procedure) {
3224         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3225     }
3226
3227     @Override
3228     public <T> void asyncRequest(Read<T> request, SyncProcedure<T> procedure) {
3229         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3230     }
3231
3232     @Override
3233     public <T> void asyncRequest(Read<T> request, final Procedure<T> procedure) {
3234         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3235     }
3236
3237     @Override
3238     final public <T> void asyncRequest(final AsyncRead<T> request) {
3239
3240         assert(request != null);
3241
3242         asyncRequest(request, new ProcedureAdapter<T>() {
3243             @Override
3244             public void exception(Throwable t) {
3245                 t.printStackTrace();
3246             }
3247         });
3248
3249     }
3250
3251     @Override
3252     public <T> void asyncRequest(AsyncRead<T> request, AsyncListener<T> procedure) {
3253         scheduleRequest(request, procedure, procedure, null);
3254     }
3255
3256     @Override
3257     public <T> void asyncRequest(AsyncRead<T> request, final SyncListener<T> procedure) {
3258         asyncRequest(request, new SyncToAsyncListener<T>(procedure));
3259     }
3260
3261     @Override
3262     public <T> void asyncRequest(AsyncRead<T> request, final Listener<T> procedure) {
3263         asyncRequest(request, new NoneToAsyncListener<T>(procedure));
3264     }
3265
3266     @Override
3267     public <T> void asyncRequest(AsyncRead<T> request, SyncProcedure<T> procedure) {
3268         asyncRequest(request, new SyncToAsyncProcedure<T>(procedure));
3269     }
3270
3271     @Override
3272     final public <T> void asyncRequest(final AsyncRead<T> request, final Procedure<T> procedure) {
3273         asyncRequest(request, new NoneToAsyncProcedure<T>(procedure));
3274     }
3275
3276     @Override
3277     public <T> void asyncRequest(MultiRead<T> request) {
3278
3279         assert(request != null);
3280
3281         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3282             @Override
3283             public void exception(AsyncReadGraph graph, Throwable t) {
3284                 t.printStackTrace();
3285             }
3286         });
3287
3288     }
3289
3290     @Override
3291     public <T> void asyncRequest(MultiRead<T> request, AsyncMultiListener<T> procedure) {
3292         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3293     }
3294
3295     @Override
3296     public <T> void asyncRequest(MultiRead<T> request, SyncMultiListener<T> procedure) {
3297         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3298     }
3299
3300     @Override
3301     public <T> void asyncRequest(MultiRead<T> request, MultiListener<T> procedure) {
3302         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3303     }
3304
3305     @Override
3306     public <T> void asyncRequest(MultiRead<T> request, SyncMultiProcedure<T> procedure) {
3307         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3308     }
3309
3310     @Override
3311     public <T> void asyncRequest(MultiRead<T> request, MultiProcedure<T> procedure) {
3312         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3313     }
3314
3315     @Override
3316     final public <T> void asyncRequest(AsyncMultiRead<T> request) {
3317
3318         assert(request != null);
3319
3320         asyncRequest(request, new AsyncMultiProcedureAdapter<T>() {
3321             @Override
3322             public void exception(AsyncReadGraph graph, Throwable t) {
3323                 t.printStackTrace();
3324             }
3325         });
3326
3327     }
3328
3329     @Override
3330     public <T> void asyncRequest(AsyncMultiRead<T> request, AsyncMultiListener<T> procedure) {
3331         asyncRequest(request, (AsyncMultiProcedure<T>)procedure);
3332     }
3333
3334     @Override
3335     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiListener<T> procedure) {
3336         asyncRequest(request, new SyncToAsyncMultiListener<T>(procedure));
3337     }
3338
3339     @Override
3340     public <T> void asyncRequest(AsyncMultiRead<T> request, MultiListener<T> procedure) {
3341         asyncRequest(request, new NoneToAsyncMultiListener<T>(procedure));
3342     }
3343
3344     @Override
3345     public <T> void asyncRequest(AsyncMultiRead<T> request, SyncMultiProcedure<T> procedure) {
3346         asyncRequest(request, new SyncToAsyncMultiProcedure<T>(procedure));
3347     }
3348
3349     @Override
3350     final public <T> void asyncRequest(AsyncMultiRead<T> request, final MultiProcedure<T> procedure) {
3351         asyncRequest(request, new NoneToAsyncMultiProcedure<T>(procedure));
3352     }
3353
3354     @Override
3355     public <T> Collection<T> syncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) throws DatabaseException {
3356         assertNotSession();
3357         throw new Error("Not implemented!");
3358     }
3359
3360     @Override
3361     public <T> void asyncRequest(MultiRead<T> arg0, AsyncMultiProcedure<T> arg1) {
3362         throw new Error("Not implemented!");
3363     }
3364
3365     @Override
3366     final public <T> void asyncRequest(final ExternalRead<T> request) {
3367
3368         assert(request != null);
3369
3370         asyncRequest(request, new ProcedureAdapter<T>() {
3371             @Override
3372             public void exception(Throwable t) {
3373                 t.printStackTrace();
3374             }
3375         });
3376
3377     }
3378
3379     @Override
3380     public <T> void asyncRequest(ExternalRead<T> request, final Listener<T> procedure) {
3381         asyncRequest(request, (Procedure<T>)procedure);
3382     }
3383
3384
3385
3386     void check(Throwable t) throws DatabaseException {
3387         if(t != null) {
3388             if(t instanceof DatabaseException) throw (DatabaseException)t;
3389             else throw new DatabaseException("Unexpected exception", t);
3390         }
3391     }
3392
3393     void check(DataContainer<Throwable> container) throws DatabaseException {
3394         Throwable t = container.get();
3395         if(t != null) {
3396             if(t instanceof DatabaseException) throw (DatabaseException)t;
3397             else throw new DatabaseException("Unexpected exception", t);
3398         }
3399     }
3400
3401
3402
3403
3404
3405
3406     boolean sameProvider(Write request) {
3407         if(writeState.getGraph().provider != null) {
3408             return writeState.getGraph().provider.equals(request.getProvider());
3409         } else {
3410             return request.getProvider() == null;
3411         }
3412     }
3413
3414     boolean plainWrite(WriteGraphImpl graph) {
3415         if(graph == null) return false;
3416         if(graph.writeSupport.writeOnly()) return false;
3417         return true;
3418     }
3419
3420
3421     public static final ThreadGroup SessionThreadGroup = new ThreadGroup("Session Thread Group");
3422     private void assertNotSession() throws DatabaseException {
3423         Thread current = Thread.currentThread();
3424         if(sessionThreads.contains(current)) throw new DatabaseException("Caller is already inside a transaction.");
3425     }
3426
3427     void assertAlive() {
3428         if (!state.isAlive())
3429             throw new RuntimeDatabaseException("Session has been shut down.");
3430     }
3431
3432     public InputStream getValueStream(ReadGraphImpl graph, Resource resource) {
3433         return querySupport.getValueStream(graph, querySupport.getId(resource));
3434     }
3435
3436     public byte[] getValue(ReadGraphImpl graph, Resource resource) {
3437         return querySupport.getValue(graph, querySupport.getId(resource));
3438     }
3439
3440     <T> void acquire(Semaphore semaphore, T request) throws DatabaseException {
3441         acquire(semaphore, request, null);
3442     }
3443
3444     private <T,P> void acquire(Semaphore semaphore, T request, P procedure) throws DatabaseException {
3445         assertAlive();
3446         try {
3447             long tryCount = 0;
3448             // Loop until the semaphore is acquired reporting requests that take a long time.
3449             while (true) {
3450
3451                 if (semaphore.tryAcquire(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD, DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT)) {
3452                     // Acquired OK.
3453                     return;
3454                 } else {
3455                     if (DebugPolicy.REPORT_TIME_CONSUMING_REQUESTS) {
3456                         ++tryCount;
3457                         long waitTime = DebugPolicy.LONG_EXECUTION_REPORT_PERIOD_UNIT
3458                                 .toMillis(DebugPolicy.LONG_EXECUTION_REPORT_PERIOD)
3459                                 * tryCount;
3460                         System.err.println("DB client request '" + request + "' is taking long to execute, so far "
3461                                 + (waitTime) + " ms. (procedure=" + procedure + ")");
3462                     }
3463                 }
3464
3465                 assertAlive();
3466
3467             }
3468         } catch (InterruptedException e) {
3469             e.printStackTrace();
3470             // FIXME: Should perhaps do something else in this case ??
3471         }
3472     }
3473
3474
3475
3476 //    @Override
3477 //    public boolean holdOnToTransactionAfterCancel() {
3478 //        return transactionPolicy.holdOnToTransactionAfterCancel();
3479 //    }
3480 //
3481 //    @Override
3482 //    public boolean holdOnToTransactionAfterCommit() {
3483 //        return transactionPolicy.holdOnToTransactionAfterCommit();
3484 //    }
3485 //
3486 //    @Override
3487 //    public boolean holdOnToTransactionAfterRead() {
3488 //        return transactionPolicy.holdOnToTransactionAfterRead();
3489 //    }
3490 //
3491 //    @Override
3492 //    public void onRelinquish() {
3493 //        transactionPolicy.onRelinquish();
3494 //    }
3495 //
3496 //    @Override
3497 //    public void onRelinquishDone() {
3498 //        transactionPolicy.onRelinquishDone();
3499 //    }
3500 //
3501 //    @Override
3502 //    public void onRelinquishError() {
3503 //        transactionPolicy.onRelinquishError();
3504 //    }
3505
3506
3507     @Override
3508     public Session getSession() {
3509         return this;
3510     }
3511
3512
3513     protected abstract VirtualGraph getProvider(VirtualGraph virtualGraph);
3514     protected abstract ResourceImpl getNewResource() throws DatabaseException;
3515
3516     public void ceased(int thread) {
3517
3518         requestManager.ceased(thread);
3519
3520     }
3521
3522     public int getAmountOfQueryThreads() {
3523         // This must be a power of two
3524         return 4;
3525 //        return Integer.highestOneBit(Runtime.getRuntime().availableProcessors());
3526     }
3527
3528     public Resource getResourceByKey(int key) throws ResourceNotFoundException {
3529
3530         return new ResourceImpl(resourceSupport, key);
3531
3532     }
3533
3534     public void acquireWriteOnly() {
3535         writeOnly = true;
3536     }
3537
3538     public void releaseWriteOnly(ReadGraphImpl graph) {
3539         writeOnly = false;
3540         queryProvider2.releaseWrite(graph);
3541     }
3542
3543     public void handleUpdatesAndMetadata(WriteGraphImpl writer) {
3544
3545         long start = System.nanoTime();
3546
3547         while(dirtyPrimitives) {
3548             dirtyPrimitives = false;
3549             getQueryProvider2().performDirtyUpdates(writer);
3550             getQueryProvider2().performScheduledUpdates(writer);
3551         }
3552
3553         fireMetadataListeners(writer, clientChanges);
3554
3555         if(DebugPolicy.PERFORMANCE_DATA) {
3556             long end = System.nanoTime();
3557             System.err.println("handleUpdatesAndMetadata " + 1e-9*(end-start));
3558         }
3559
3560     }
3561
3562     void removeTemporaryData() {
3563         File platform = Platform.getLocation().toFile();
3564         File tempFiles = new File(platform, "tempFiles");
3565         File temp = new File(tempFiles, "db");
3566         if(!temp.exists()) 
3567             return;
3568         try {
3569             Files.walkFileTree(temp.toPath(), new SimpleFileVisitor<Path>() {
3570                 @Override
3571                 public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
3572                     try {
3573                         Files.delete(file);
3574                     } catch (IOException e) {
3575                         Logger.defaultLogError(e);
3576                     }
3577                     return FileVisitResult.CONTINUE;
3578                 }
3579             });
3580         } catch (IOException e) {
3581             Logger.defaultLogError(e);
3582         }
3583     }
3584
3585     public void handleCreatedClusters() {
3586         if ((serviceMode & SERVICE_MODE_CREATE) > 0) {
3587             createdClusters.forEach(new TLongProcedure() {
3588
3589                 @Override
3590                 public boolean execute(long value) {
3591                     ClusterImpl cluster = clusterTable.getClusterByClusterId(value);
3592                     cluster.setImmutable(true, clusterTranslator);
3593                     return true;
3594                 }
3595             });
3596         }
3597         createdClusters.clear();
3598     }
3599
3600     @Override
3601     public Object getModificationCounter() {
3602         return queryProvider2.modificationCounter;
3603     }
3604     
3605     @Override
3606     public void markUndoPoint() {
3607         state.setCombine(false);
3608     }
3609
3610 }